This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 556dd6fbc00f713e1f2f10e441e1121c50fadad1
Author: Yi Zhang <[email protected]>
AuthorDate: Thu Dec 18 14:05:08 2025 +0800

    [FLINK-38758][runtime] Use JobStatus instead of ApplicationStatus in 
JobResult
---
 .../ApplicationDispatcherBootstrap.java            |  5 +--
 .../UnsuccessfulExecutionException.java            | 26 +++++++++------
 .../client/program/rest/RestClusterClient.java     |  3 +-
 .../ApplicationDispatcherBootstrapITCase.java      |  8 ++---
 .../ApplicationDispatcherBootstrapTest.java        | 31 +++++++----------
 .../application/JobStatusPollingUtilsTest.java     |  5 ++-
 .../client/program/rest/RestClusterClientTest.java |  7 ++--
 .../cleanup/CheckpointResourcesCleanupRunner.java  |  2 +-
 .../apache/flink/runtime/jobmaster/JobResult.java  | 39 +++++++++++-----------
 .../rest/messages/json/JobResultDeserializer.java  | 14 +++++---
 .../rest/messages/json/JobResultSerializer.java    |  4 ++-
 .../flink/runtime/dispatcher/DispatcherTest.java   | 14 ++++----
 .../CheckpointResourcesCleanupRunnerTest.java      | 17 ++++------
 .../ZooKeeperDefaultDispatcherRunnerTest.java      |  4 +--
 ...FileSystemJobResultStoreFileOperationsTest.java |  4 +--
 .../runtime/jobmanager/BlobsCleanupITCase.java     |  6 ++--
 .../JobMasterServiceLeadershipRunnerTest.java      |  4 +--
 .../LeaderChangeClusterComponentsTest.java         |  3 +-
 .../job/JobExecutionResultResponseBodyTest.java    | 10 +++---
 .../runtime/testutils/TestingJobResultStore.java   | 14 ++++----
 .../checkpointing/ChangelogRecoveryITCaseBase.java |  4 +--
 .../environment/RemoteStreamEnvironmentTest.java   |  3 +-
 22 files changed, 107 insertions(+), 120 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
index 97336c7b265..4290dcd4748 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
@@ -201,7 +201,8 @@ public class ApplicationDispatcherBootstrap implements 
DispatcherBootstrap {
     private Optional<ApplicationStatus> extractApplicationStatus(Throwable t) {
         final Optional<UnsuccessfulExecutionException> maybeException =
                 ExceptionUtils.findThrowable(t, 
UnsuccessfulExecutionException.class);
-        return maybeException.map(UnsuccessfulExecutionException::getStatus);
+        return maybeException.map(
+                exception -> 
ApplicationStatus.fromJobStatus(exception.getStatus().orElse(null)));
     }
 
     private CompletableFuture<Void> fixJobIdAndRunApplicationAsync(
@@ -377,7 +378,7 @@ public class ApplicationDispatcherBootstrap implements 
DispatcherBootstrap {
                     exception ->
                             new JobResult.Builder()
                                     .jobId(jobId)
-                                    
.applicationStatus(ApplicationStatus.UNKNOWN)
+                                    .jobStatus(null)
                                     .netRuntime(Long.MAX_VALUE)
                                     .build());
         }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/UnsuccessfulExecutionException.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/UnsuccessfulExecutionException.java
index bc360d5535c..b828c0e5f60 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/UnsuccessfulExecutionException.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/UnsuccessfulExecutionException.java
@@ -20,30 +20,34 @@ package org.apache.flink.client.deployment.application;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.jobmaster.JobResult;
 
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
-/** Exception that signals the failure of an application with a given {@link 
ApplicationStatus}. */
+/** Exception that signals the failure of a job with a given {@link 
JobStatus}. */
 @Internal
 public class UnsuccessfulExecutionException extends JobExecutionException {
 
-    private final ApplicationStatus status;
+    @Nullable private final JobStatus status;
 
     public UnsuccessfulExecutionException(
             final JobID jobID,
-            final ApplicationStatus status,
+            @Nullable final JobStatus status,
             final String message,
             final Throwable cause) {
         super(jobID, message, cause);
-        this.status = checkNotNull(status);
+        this.status = status;
     }
 
-    public ApplicationStatus getStatus() {
-        return status;
+    public Optional<JobStatus> getStatus() {
+        return Optional.ofNullable(status);
     }
 
     public static UnsuccessfulExecutionException fromJobResult(
@@ -64,13 +68,13 @@ public class UnsuccessfulExecutionException extends 
JobExecutionException {
         } catch (Throwable t) {
 
             final JobID jobID = result.getJobId();
-            final ApplicationStatus status = result.getApplicationStatus();
+            final JobStatus status = result.getJobStatus().orElse(null);
 
-            return status == ApplicationStatus.CANCELED || status == 
ApplicationStatus.FAILED
+            return status == JobStatus.CANCELED || status == JobStatus.FAILED
                     ? new UnsuccessfulExecutionException(
-                            jobID, status, "Application Status: " + 
status.name(), t)
+                            jobID, status, "Job Status: " + status.name(), t)
                     : new UnsuccessfulExecutionException(
-                            jobID, ApplicationStatus.UNKNOWN, "Job failed for 
unknown reason.", t);
+                            jobID, null, "Job failed for unknown reason.", t);
         }
     }
 }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 496612406b5..3cfe7dd7341 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -38,7 +38,6 @@ import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.client.JobSubmissionException;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import 
org.apache.flink.runtime.highavailability.ClientHighAvailabilityServices;
 import 
org.apache.flink.runtime.highavailability.ClientHighAvailabilityServicesFactory;
 import 
org.apache.flink.runtime.highavailability.DefaultClientHighAvailabilityServicesFactory;
@@ -1027,7 +1026,7 @@ public class RestClusterClient<T> implements 
ClusterClient<T> {
                         })
                 .thenApply(
                         jobResult -> {
-                            if (jobResult.getApplicationStatus() == 
ApplicationStatus.UNKNOWN) {
+                            if (jobResult.getJobStatus().isEmpty()) {
                                 throw new JobStateUnknownException(
                                         String.format("Result for Job %s is 
UNKNOWN", jobId));
                             }
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java
 
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java
index 3e0b3953c45..b5fb78bf94d 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java
@@ -33,7 +33,6 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.PipelineOptionsInternal;
 import org.apache.flink.core.testutils.FlinkAssertions;
 import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
 import 
org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
 import 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
@@ -131,9 +130,7 @@ class ApplicationDispatcherBootstrapITCase {
             final CompletableFuture<JobResult> firstJobResult = 
cluster.requestJobResult(jobId);
             haServices.revokeDispatcherLeadership();
             // make sure the leadership is revoked to avoid race conditions
-            assertThat(firstJobResult.get())
-                    .extracting(JobResult::getApplicationStatus)
-                    .isEqualTo(ApplicationStatus.UNKNOWN);
+            
assertThat(firstJobResult.get().getJobStatus().isPresent()).isFalse();
             haServices.grantDispatcherLeadership();
 
             // job is suspended, wait until it's running
@@ -145,8 +142,7 @@ class ApplicationDispatcherBootstrapITCase {
             // and wait for it to actually finish
             final JobResult secondJobResult = 
cluster.requestJobResult(jobId).get();
             assertThat(secondJobResult.isSuccess()).isTrue();
-            assertThat(secondJobResult.getApplicationStatus())
-                    .isEqualTo(ApplicationStatus.SUCCEEDED);
+            
assertThat(secondJobResult.getJobStatus().orElse(null)).isEqualTo(JobStatus.FINISHED);
 
             // the cluster should shut down automatically once the application 
completes
             awaitClusterStopped(cluster);
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
index 6bf77bfff84..47c5e13fe7b 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
@@ -55,6 +55,8 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 
+import javax.annotation.Nullable;
+
 import java.util.Collections;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -260,7 +262,7 @@ class ApplicationDispatcherBootstrapTest {
         final CompletableFuture<Void> applicationFuture = 
runApplication(dispatcherBuilder, 2);
         final UnsuccessfulExecutionException exception =
                 assertException(applicationFuture, 
UnsuccessfulExecutionException.class);
-        assertThat(exception.getStatus()).isEqualTo(ApplicationStatus.FAILED);
+        
assertThat(exception.getStatus().orElse(null)).isEqualTo(JobStatus.FAILED);
     }
 
     @Test
@@ -435,8 +437,7 @@ class ApplicationDispatcherBootstrapTest {
                         .setRequestJobResultFunction(
                                 jobId ->
                                         CompletableFuture.completedFuture(
-                                                createJobResult(
-                                                        jobId, 
ApplicationStatus.SUCCEEDED)))
+                                                createJobResult(jobId, 
JobStatus.FINISHED)))
                         .setClusterShutdownFunction(status -> 
shutdownFunction.get());
 
         // we're "listening" on this to be completed to verify that the error 
handler is called.
@@ -820,8 +821,7 @@ class ApplicationDispatcherBootstrapTest {
                                 jobId ->
                                         submitted.thenApply(
                                                 ignored ->
-                                                        createJobResult(
-                                                                jobId, 
ApplicationStatus.FAILED)))
+                                                        createJobResult(jobId, 
JobStatus.FAILED)))
                         .build();
 
         final ApplicationDispatcherBootstrap bootstrap =
@@ -886,10 +886,7 @@ class ApplicationDispatcherBootstrapTest {
                                 jobId -> 
CompletableFuture.completedFuture(jobStatus));
         if (jobStatus != JobStatus.RUNNING) {
             builder.setRequestJobResultFunction(
-                    jobID ->
-                            CompletableFuture.completedFuture(
-                                    createJobResult(
-                                            jobID, 
ApplicationStatus.fromJobStatus(jobStatus))));
+                    jobID -> 
CompletableFuture.completedFuture(createJobResult(jobID, jobStatus)));
         }
         return builder;
     }
@@ -979,25 +976,21 @@ class ApplicationDispatcherBootstrapTest {
     }
 
     private static JobResult createFailedJobResult(final JobID jobId) {
-        return createJobResult(jobId, ApplicationStatus.FAILED);
+        return createJobResult(jobId, JobStatus.FAILED);
     }
 
     private static JobResult createUnknownJobResult(final JobID jobId) {
-        return createJobResult(jobId, ApplicationStatus.UNKNOWN);
+        return createJobResult(jobId, null);
     }
 
     private static JobResult createJobResult(
-            final JobID jobID, final ApplicationStatus applicationStatus) {
+            final JobID jobID, @Nullable final JobStatus jobStatus) {
         JobResult.Builder builder =
-                new JobResult.Builder()
-                        .jobId(jobID)
-                        .netRuntime(2L)
-                        .applicationStatus(applicationStatus);
-        if (applicationStatus == ApplicationStatus.CANCELED) {
+                new 
JobResult.Builder().jobId(jobID).netRuntime(2L).jobStatus(jobStatus);
+        if (jobStatus == JobStatus.CANCELED) {
             builder.serializedThrowable(
                     new SerializedThrowable(new 
JobCancellationException(jobID, "Hello", null)));
-        } else if (applicationStatus == ApplicationStatus.FAILED
-                || applicationStatus == ApplicationStatus.UNKNOWN) {
+        } else if (jobStatus == JobStatus.FAILED || jobStatus == null) {
             builder.serializedThrowable(
                     new SerializedThrowable(new JobExecutionException(jobID, 
"bla bla bla")));
         }
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/JobStatusPollingUtilsTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/JobStatusPollingUtilsTest.java
index 087829f7ce5..ed057bf3289 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/JobStatusPollingUtilsTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/JobStatusPollingUtilsTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.client.deployment.application;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.SerializedThrowable;
@@ -164,7 +163,7 @@ class JobStatusPollingUtilsTest {
         return new JobResult.Builder()
                 .jobId(jobId)
                 .netRuntime(2L)
-                .applicationStatus(ApplicationStatus.FAILED)
+                .jobStatus(JobStatus.FAILED)
                 .serializedThrowable(new SerializedThrowable(new 
Exception("bla bla bla")))
                 .build();
     }
@@ -173,7 +172,7 @@ class JobStatusPollingUtilsTest {
         return new JobResult.Builder()
                 .jobId(jobId)
                 .netRuntime(2L)
-                .applicationStatus(ApplicationStatus.SUCCEEDED)
+                .jobStatus(JobStatus.FINISHED)
                 .build();
     }
 }
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 27e1835b263..71e00f38d55 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -35,7 +35,6 @@ import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.client.JobSubmissionException;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
@@ -880,7 +879,7 @@ class RestClusterClientTest {
                         // On an UNKNOWN JobResult it should be retried
                         JobExecutionResultResponseBody.created(
                                 new JobResult.Builder()
-                                        
.applicationStatus(ApplicationStatus.UNKNOWN)
+                                        .jobStatus(null)
                                         .jobId(jobId)
                                         .netRuntime(Long.MAX_VALUE)
                                         .accumulatorResults(
@@ -891,7 +890,7 @@ class RestClusterClientTest {
                                         .build()),
                         JobExecutionResultResponseBody.created(
                                 new JobResult.Builder()
-                                        
.applicationStatus(ApplicationStatus.SUCCEEDED)
+                                        .jobStatus(JobStatus.FINISHED)
                                         .jobId(jobId)
                                         .netRuntime(Long.MAX_VALUE)
                                         .accumulatorResults(
@@ -902,7 +901,7 @@ class RestClusterClientTest {
                                         .build()),
                         JobExecutionResultResponseBody.created(
                                 new JobResult.Builder()
-                                        
.applicationStatus(ApplicationStatus.FAILED)
+                                        .jobStatus(JobStatus.FAILED)
                                         .jobId(jobId)
                                         .netRuntime(Long.MAX_VALUE)
                                         .serializedThrowable(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
index 529fd39d740..2aaa70876cf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
@@ -216,7 +216,7 @@ public class CheckpointResourcesCleanupRunner implements 
JobManagerRunner {
     }
 
     private static JobStatus getJobStatus(JobResult jobResult) {
-        return jobResult.getApplicationStatus().deriveJobStatus();
+        return jobResult.getJobStatus().orElseThrow();
     }
 
     private static ExecutionGraphInfo generateExecutionGraphInfo(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
index cd8bd007fb1..b385b28da39 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.runtime.client.JobCancellationException;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ErrorInfo;
@@ -57,7 +56,8 @@ public class JobResult implements Serializable {
 
     private final JobID jobId;
 
-    private final ApplicationStatus applicationStatus;
+    /** Stores the job status, null if unknown. */
+    @Nullable private final JobStatus jobStatus;
 
     private final Map<String, SerializedValue<OptionalFailure<Object>>> 
accumulatorResults;
 
@@ -68,15 +68,18 @@ public class JobResult implements Serializable {
 
     private JobResult(
             final JobID jobId,
-            final ApplicationStatus applicationStatus,
+            @Nullable final JobStatus jobStatus,
             final Map<String, SerializedValue<OptionalFailure<Object>>> 
accumulatorResults,
             final long netRuntime,
             @Nullable final SerializedThrowable serializedThrowable) {
 
         checkArgument(netRuntime >= 0, "netRuntime must be greater than or 
equals 0");
+        checkArgument(
+                jobStatus == null || jobStatus.isGloballyTerminalState(),
+                "jobStatus must be globally terminal or unknow(null)");
 
         this.jobId = requireNonNull(jobId);
-        this.applicationStatus = requireNonNull(applicationStatus);
+        this.jobStatus = jobStatus;
         this.accumulatorResults = requireNonNull(accumulatorResults);
         this.netRuntime = netRuntime;
         this.serializedThrowable = serializedThrowable;
@@ -84,16 +87,16 @@ public class JobResult implements Serializable {
 
     /** Returns {@code true} if the job finished successfully. */
     public boolean isSuccess() {
-        return applicationStatus == ApplicationStatus.SUCCEEDED
-                || (applicationStatus == ApplicationStatus.UNKNOWN && 
serializedThrowable == null);
+        return jobStatus == JobStatus.FINISHED
+                || (jobStatus == null && serializedThrowable == null);
     }
 
     public JobID getJobId() {
         return jobId;
     }
 
-    public ApplicationStatus getApplicationStatus() {
-        return applicationStatus;
+    public Optional<JobStatus> getJobStatus() {
+        return Optional.ofNullable(jobStatus);
     }
 
     public Map<String, SerializedValue<OptionalFailure<Object>>> 
getAccumulatorResults() {
@@ -124,7 +127,7 @@ public class JobResult implements Serializable {
      */
     public JobExecutionResult toJobExecutionResult(ClassLoader classLoader)
             throws JobExecutionException, IOException, ClassNotFoundException {
-        if (applicationStatus == ApplicationStatus.SUCCEEDED) {
+        if (jobStatus == JobStatus.FINISHED) {
             return new JobExecutionResult(
                     jobId,
                     netRuntime,
@@ -140,17 +143,15 @@ public class JobResult implements Serializable {
 
             final JobExecutionException exception;
 
-            if (applicationStatus == ApplicationStatus.FAILED) {
+            if (jobStatus == JobStatus.FAILED) {
                 exception = new JobExecutionException(jobId, "Job execution 
failed.", cause);
-            } else if (applicationStatus == ApplicationStatus.CANCELED) {
+            } else if (jobStatus == JobStatus.CANCELED) {
                 exception = new JobCancellationException(jobId, "Job was 
cancelled.", cause);
             } else {
                 exception =
                         new JobExecutionException(
                                 jobId,
-                                "Job completed with illegal application 
status: "
-                                        + applicationStatus
-                                        + '.',
+                                "Job completed with illegal status: " + 
jobStatus + '.',
                                 cause);
             }
 
@@ -164,7 +165,7 @@ public class JobResult implements Serializable {
 
         private JobID jobId;
 
-        private ApplicationStatus applicationStatus = 
ApplicationStatus.UNKNOWN;
+        private JobStatus jobStatus;
 
         private Map<String, SerializedValue<OptionalFailure<Object>>> 
accumulatorResults;
 
@@ -177,8 +178,8 @@ public class JobResult implements Serializable {
             return this;
         }
 
-        public Builder applicationStatus(final ApplicationStatus 
applicationStatus) {
-            this.applicationStatus = applicationStatus;
+        public Builder jobStatus(final JobStatus jobStatus) {
+            this.jobStatus = jobStatus;
             return this;
         }
 
@@ -201,7 +202,7 @@ public class JobResult implements Serializable {
         public JobResult build() {
             return new JobResult(
                     jobId,
-                    applicationStatus,
+                    jobStatus,
                     accumulatorResults == null ? Collections.emptyMap() : 
accumulatorResults,
                     netRuntime,
                     serializedThrowable);
@@ -233,7 +234,7 @@ public class JobResult implements Serializable {
         final JobResult.Builder builder = new JobResult.Builder();
         builder.jobId(jobId);
 
-        
builder.applicationStatus(ApplicationStatus.fromJobStatus(accessExecutionGraph.getState()));
+        builder.jobStatus(jobStatus.isGloballyTerminalState() ? jobStatus : 
null);
 
         final long netRuntime =
                 accessExecutionGraph.getStatusTimestamp(jobStatus)
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java
index 5eddeaf9b09..3ebd2aa6590 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.rest.messages.json;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.util.OptionalFailure;
@@ -70,7 +71,7 @@ public class JobResultDeserializer extends 
StdDeserializer<JobResult> {
     public JobResult deserialize(final JsonParser p, final 
DeserializationContext ctxt)
             throws IOException {
         JobID jobId = null;
-        ApplicationStatus applicationStatus = ApplicationStatus.UNKNOWN;
+        JobStatus jobStatus = null;
         long netRuntime = -1;
         SerializedThrowable serializedThrowable = null;
         Map<String, SerializedValue<OptionalFailure<Object>>> 
accumulatorResults = null;
@@ -90,8 +91,13 @@ public class JobResultDeserializer extends 
StdDeserializer<JobResult> {
                     break;
                 case JobResultSerializer.FIELD_NAME_APPLICATION_STATUS:
                     assertNextToken(p, JsonToken.VALUE_STRING);
-                    applicationStatus =
-                            
ApplicationStatus.valueOf(p.getValueAsString().toUpperCase());
+                    try {
+                        jobStatus =
+                                
ApplicationStatus.valueOf(p.getValueAsString().toUpperCase())
+                                        .deriveJobStatus();
+                    } catch (UnsupportedOperationException e) {
+                        // jobStatus = null to indicate that the job status is 
unknown
+                    }
                     break;
                 case JobResultSerializer.FIELD_NAME_NET_RUNTIME:
                     assertNextToken(p, JsonToken.VALUE_NUMBER_INT);
@@ -113,7 +119,7 @@ public class JobResultDeserializer extends 
StdDeserializer<JobResult> {
         try {
             return new JobResult.Builder()
                     .jobId(jobId)
-                    .applicationStatus(applicationStatus)
+                    .jobStatus(jobStatus)
                     .netRuntime(netRuntime)
                     .accumulatorResults(accumulatorResults)
                     .serializedThrowable(serializedThrowable)
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java
index 33e375c89ee..562e340c249 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.rest.messages.json;
 
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.SerializedThrowable;
@@ -78,8 +79,9 @@ public class JobResultSerializer extends 
StdSerializer<JobResult> {
         gen.writeFieldName(FIELD_NAME_JOB_ID);
         jobIdSerializer.serialize(result.getJobId(), gen, provider);
 
+        // use application status to maintain backward compatibility
         gen.writeFieldName(FIELD_NAME_APPLICATION_STATUS);
-        gen.writeString(result.getApplicationStatus().name());
+        
gen.writeString(ApplicationStatus.fromJobStatus(result.getJobStatus().orElse(null)).name());
 
         gen.writeFieldName(FIELD_NAME_ACCUMULATOR_RESULTS);
         gen.writeStartObject();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index a6325b252ff..ab25cf02dd0 100755
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -221,8 +221,7 @@ public class DispatcherTest extends AbstractDispatcherTest {
     @Test
     public void testDuplicateJobSubmissionWithGloballyTerminatedButDirtyJob() 
throws Exception {
         final JobResult jobResult =
-                TestingJobResultStore.createJobResult(
-                        jobGraph.getJobID(), ApplicationStatus.SUCCEEDED);
+                TestingJobResultStore.createJobResult(jobGraph.getJobID(), 
JobStatus.FINISHED);
         haServices.getJobResultStore().createDirtyResultAsync(new 
JobResultEntry(jobResult)).get();
         assertDuplicateJobSubmission();
     }
@@ -230,8 +229,7 @@ public class DispatcherTest extends AbstractDispatcherTest {
     @Test
     public void 
testDuplicateJobSubmissionWithGloballyTerminatedAndCleanedJob() throws 
Exception {
         final JobResult jobResult =
-                TestingJobResultStore.createJobResult(
-                        jobGraph.getJobID(), ApplicationStatus.SUCCEEDED);
+                TestingJobResultStore.createJobResult(jobGraph.getJobID(), 
JobStatus.FINISHED);
         haServices.getJobResultStore().createDirtyResultAsync(new 
JobResultEntry(jobResult)).get();
         
haServices.getJobResultStore().markResultAsCleanAsync(jobGraph.getJobID()).get();
 
@@ -450,8 +448,8 @@ public class DispatcherTest extends AbstractDispatcherTest {
 
         
assertThatFuture(dispatcherGateway.requestJobResult(jobGraph.getJobID(), 
TIMEOUT))
                 .eventuallySucceeds()
-                .extracting(JobResult::getApplicationStatus)
-                .isEqualTo(ApplicationStatus.CANCELED);
+                .extracting(jobResult -> jobResult.getJobStatus().orElse(null))
+                .isEqualTo(JobStatus.CANCELED);
     }
 
     @Test
@@ -910,7 +908,7 @@ public class DispatcherTest extends AbstractDispatcherTest {
                                 Collections.singleton(
                                         new JobResult.Builder()
                                                 
.jobId(jobIdOfRecoveredDirtyJobs)
-                                                
.applicationStatus(ApplicationStatus.SUCCEEDED)
+                                                .jobStatus(JobStatus.FINISHED)
                                                 .netRuntime(1)
                                                 .build()))
                         .setDispatcherBootstrapFactory(
@@ -983,7 +981,7 @@ public class DispatcherTest extends AbstractDispatcherTest {
         dispatcher.close();
 
         final JobResult jobResult = jobResultFuture.get();
-        
assertThat(jobResult.getApplicationStatus()).isSameAs(ApplicationStatus.UNKNOWN);
+        assertThat(jobResult.getJobStatus().isPresent()).isFalse();
     }
 
     @Test
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java
index 5dbe48c120e..f1c3331ad10 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java
@@ -29,7 +29,6 @@ import 
org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.TestingCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.TestingCompletedCheckpointStore;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import 
org.apache.flink.runtime.dispatcher.UnavailableDispatcherOperationException;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobmaster.JobManagerRunnerResult;
@@ -386,7 +385,7 @@ class CheckpointResourcesCleanupRunnerTest {
         final JobID jobId = new JobID();
         final CheckpointResourcesCleanupRunner testInstance =
                 new TestInstanceBuilder()
-                        .withJobResult(createJobResult(jobId, 
ApplicationStatus.CANCELED))
+                        .withJobResult(createJobResult(jobId, 
JobStatus.CANCELED))
                         .build();
         assertThat(testInstance.getJobID()).isEqualTo(jobId);
     }
@@ -454,7 +453,7 @@ class CheckpointResourcesCleanupRunnerTest {
                 actualExecutionGraph ->
                         actualExecutionGraph
                                 .getState()
-                                
.equals(jobResult.getApplicationStatus().deriveJobStatus()));
+                                
.equals(jobResult.getJobStatus().orElseThrow()));
     }
 
     @Test
@@ -509,24 +508,20 @@ class CheckpointResourcesCleanupRunnerTest {
     }
 
     private static JobResult createDummySuccessJobResult() {
-        return createJobResult(new JobID(), ApplicationStatus.SUCCEEDED);
+        return createJobResult(new JobID(), JobStatus.FINISHED);
     }
 
     private static JobResult createJobResultWithFailure(SerializedThrowable 
throwable) {
         return new JobResult.Builder()
                 .jobId(new JobID())
-                .applicationStatus(ApplicationStatus.FAILED)
+                .jobStatus(JobStatus.FAILED)
                 .serializedThrowable(throwable)
                 .netRuntime(1)
                 .build();
     }
 
-    private static JobResult createJobResult(JobID jobId, ApplicationStatus 
applicationStatus) {
-        return new JobResult.Builder()
-                .jobId(jobId)
-                .applicationStatus(applicationStatus)
-                .netRuntime(1)
-                .build();
+    private static JobResult createJobResult(JobID jobId, JobStatus jobStatus) 
{
+        return new 
JobResult.Builder().jobId(jobId).jobStatus(jobStatus).netRuntime(1).build();
     }
 
     private static CheckpointRecoveryFactory createCheckpointRecoveryFactory() 
{
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java
index 3294fed82bc..7271011a49a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.runtime.dispatcher.runner;
 
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.core.testutils.AllCallbackWrapper;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.BlobUtils;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.dispatcher.DispatcherId;
 import org.apache.flink.runtime.dispatcher.DispatcherOperationCaches;
@@ -227,7 +227,7 @@ class ZooKeeperDefaultDispatcherRunnerTest {
                 // a successful cancellation should eventually remove all job 
information
                 final JobResult jobResult = jobResultFuture.get();
 
-                
assertThat(jobResult.getApplicationStatus()).isEqualTo(ApplicationStatus.CANCELED);
+                
assertThat(jobResult.getJobStatus().orElse(null)).isEqualTo(JobStatus.CANCELED);
 
                 dispatcherLeaderElection.notLeader();
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreFileOperationsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreFileOperationsTest.java
index 2a0e1dbcb5b..4e50c1d414f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreFileOperationsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreFileOperationsTest.java
@@ -255,8 +255,8 @@ public class FileSystemJobResultStoreFileOperationsTest {
                 .extracting(JobResult::getJobId)
                 .isEqualTo(DUMMY_JOB_RESULT_ENTRY.getJobId());
         assertThat(deserializedJobResult)
-                .extracting(JobResult::getApplicationStatus)
-                
.isEqualTo(DUMMY_JOB_RESULT_ENTRY.getJobResult().getApplicationStatus());
+                .extracting(JobResult::getJobStatus)
+                
.isEqualTo(DUMMY_JOB_RESULT_ENTRY.getJobResult().getJobStatus());
         assertThat(deserializedJobResult)
                 .extracting(JobResult::getNetRuntime)
                 
.isEqualTo(DUMMY_JOB_RESULT_ENTRY.getJobResult().getNetRuntime());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/BlobsCleanupITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/BlobsCleanupITCase.java
index d8adebff1f3..92a0c9b7dea 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/BlobsCleanupITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/BlobsCleanupITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.configuration.BlobServerOptions;
@@ -29,7 +30,6 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.client.JobSubmissionException;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -207,14 +207,14 @@ public class BlobsCleanupITCase extends TestLogger {
                 // then the tasks will fail again and the restart strategy 
will finalise the job
                 final JobResult jobResult = resultFuture.get();
                 assertThat(jobResult.isSuccess(), is(false));
-                assertThat(jobResult.getApplicationStatus(), 
is(ApplicationStatus.FAILED));
+                assertThat(jobResult.getJobStatus().orElse(null), 
is(JobStatus.FAILED));
             } else if (testCase == TestCase.JOB_IS_CANCELLED) {
 
                 miniCluster.cancelJob(jid);
 
                 final JobResult jobResult = resultFuture.get();
                 assertThat(jobResult.isSuccess(), is(false));
-                assertThat(jobResult.getApplicationStatus(), 
is(ApplicationStatus.CANCELED));
+                assertThat(jobResult.getJobStatus().orElse(null), 
is(JobStatus.CANCELED));
             } else {
                 final JobResult jobResult = resultFuture.get();
                 Throwable cause =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
index dfe5869581b..f3c367b52ca 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.TestingClassLoaderLease;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
@@ -677,8 +676,7 @@ class JobMasterServiceLeadershipRunnerTest {
     @Test
     void testJobAlreadyDone() throws Exception {
         final JobID jobId = new JobID();
-        final JobResult jobResult =
-                TestingJobResultStore.createJobResult(jobId, 
ApplicationStatus.UNKNOWN);
+        final JobResult jobResult = 
TestingJobResultStore.createJobResult(jobId, null);
         jobResultStore.createDirtyResultAsync(new 
JobResultEntry(jobResult)).get();
         try (JobManagerRunner jobManagerRunner =
                 newJobMasterServiceLeadershipRunnerBuilder()
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java
index ba098f704e8..7b1de5c4a8a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.leaderelection;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.execution.Environment;
 import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -114,7 +113,7 @@ class LeaderChangeClusterComponentsTest {
         highAvailabilityServices.revokeDispatcherLeadership().get();
 
         JobResult jobResult = jobResultFuture.get();
-        
assertThat(jobResult.getApplicationStatus()).isEqualTo(ApplicationStatus.UNKNOWN);
+        assertThat(jobResult.getJobStatus().isPresent()).isFalse();
 
         highAvailabilityServices.grantDispatcherLeadership();
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBodyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBodyTest.java
index 041124b3906..278b3047546 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBodyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBodyTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.rest.messages.job;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
 import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
@@ -64,7 +64,7 @@ public class JobExecutionResultResponseBodyTest
                         JobExecutionResultResponseBody.created(
                                 new JobResult.Builder()
                                         .jobId(TEST_JOB_ID)
-                                        
.applicationStatus(ApplicationStatus.SUCCEEDED)
+                                        .jobStatus(JobStatus.FINISHED)
                                         .netRuntime(TEST_NET_RUNTIME)
                                         .accumulatorResults(TEST_ACCUMULATORS)
                                         .serializedThrowable(
@@ -76,7 +76,7 @@ public class JobExecutionResultResponseBodyTest
                         JobExecutionResultResponseBody.created(
                                 new JobResult.Builder()
                                         .jobId(TEST_JOB_ID)
-                                        
.applicationStatus(ApplicationStatus.FAILED)
+                                        .jobStatus(JobStatus.FAILED)
                                         .netRuntime(TEST_NET_RUNTIME)
                                         .accumulatorResults(TEST_ACCUMULATORS)
                                         .build())
@@ -117,8 +117,8 @@ public class JobExecutionResultResponseBodyTest
 
             assertThat(actualJobExecutionResult.getJobId())
                     .isEqualTo(expectedJobExecutionResult.getJobId());
-            assertThat(actualJobExecutionResult.getApplicationStatus())
-                    
.isEqualTo(expectedJobExecutionResult.getApplicationStatus());
+            assertThat(actualJobExecutionResult.getJobStatus())
+                    .isEqualTo(expectedJobExecutionResult.getJobStatus());
             assertThat(actualJobExecutionResult.getNetRuntime())
                     .isEqualTo(expectedJobExecutionResult.getNetRuntime());
             assertThat(actualJobExecutionResult.getAccumulatorResults())
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobResultStore.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobResultStore.java
index 6790189f31c..63606bab2bd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobResultStore.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobResultStore.java
@@ -19,13 +19,15 @@
 package org.apache.flink.runtime.testutils;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.highavailability.JobResultEntry;
 import org.apache.flink.runtime.highavailability.JobResultStore;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.function.SupplierWithException;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Set;
@@ -41,15 +43,11 @@ public class TestingJobResultStore implements 
JobResultStore {
     public static final JobResult DUMMY_JOB_RESULT = 
createSuccessfulJobResult(new JobID());
 
     public static JobResult createSuccessfulJobResult(JobID jobId) {
-        return createJobResult(jobId, ApplicationStatus.SUCCEEDED);
+        return createJobResult(jobId, JobStatus.FINISHED);
     }
 
-    public static JobResult createJobResult(JobID jobId, ApplicationStatus 
applicationStatus) {
-        return new JobResult.Builder()
-                .jobId(jobId)
-                .applicationStatus(applicationStatus)
-                .netRuntime(1)
-                .build();
+    public static JobResult createJobResult(JobID jobId, @Nullable JobStatus 
jobStatus) {
+        return new 
JobResult.Builder().jobId(jobId).jobStatus(jobStatus).netRuntime(1).build();
     }
 
     private final Function<JobResultEntry, CompletableFuture<Void>> 
createDirtyResultConsumer;
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryITCaseBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryITCaseBase.java
index 55dabf253c4..0c5c9da827c 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryITCaseBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryITCaseBase.java
@@ -18,6 +18,7 @@
 package org.apache.flink.test.checkpointing;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.functions.OpenContext;
@@ -31,7 +32,6 @@ import 
org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.StateChangelogOptions;
 import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
@@ -286,7 +286,7 @@ public abstract class ChangelogRecoveryITCaseBase extends 
TestLogger {
         if (jobResult.getSerializedThrowable().isPresent()) {
             throw jobResult.getSerializedThrowable().get();
         }
-        assertSame(ApplicationStatus.SUCCEEDED, 
jobResult.getApplicationStatus());
+        assertSame(JobStatus.FINISHED, jobResult.getJobStatus().orElse(null));
     }
 
     private Configuration configure() throws IOException {
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
index 45a2433da36..5b987adc0c4 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
@@ -31,7 +31,6 @@ import 
org.apache.flink.core.execution.PipelineExecutorFactory;
 import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.jobmaster.JobResult.Builder;
@@ -195,7 +194,7 @@ public class RemoteStreamEnvironmentTest extends TestLogger 
{
                     new Builder()
                             .jobId(this.jobId)
                             .netRuntime(0)
-                            .applicationStatus(ApplicationStatus.SUCCEEDED)
+                            .jobStatus(JobStatus.FINISHED)
                             .build();
             return CompletableFuture.completedFuture(jobResult);
         }

Reply via email to