This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.6 by this push:
     new 7606ccf  [FLINK-10400] Fail JobResult if application finished in 
CANCELED or FAILED state
7606ccf is described below

commit 7606ccf56b8c0943cfa43888af5bb51c13018441
Author: Till Rohrmann <[email protected]>
AuthorDate: Sun Sep 23 21:09:19 2018 +0200

    [FLINK-10400] Fail JobResult if application finished in CANCELED or FAILED 
state
    
    In case of the CANCELED state, the client will throw an 
JobCancellationException.
    In case of the FAILED state, the client will throw an JobExecutionException.
    
    This closes #6742.
---
 .../flink/client/program/MiniClusterClient.java    |  5 +-
 .../client/program/rest/RestClusterClient.java     |  5 +-
 .../client/program/rest/RestClusterClientTest.java | 11 ++-
 .../apache/flink/runtime/jobmaster/JobResult.java  | 79 ++++++++++++++--------
 .../flink/runtime/minicluster/MiniCluster.java     |  2 -
 .../rest/messages/json/JobResultDeserializer.java  |  7 ++
 .../rest/messages/json/JobResultSerializer.java    |  5 ++
 .../flink/runtime/jobmaster/JobResultTest.java     | 65 ++++++++++++++++++
 .../job/JobExecutionResultResponseBodyTest.java    |  4 ++
 9 files changed, 148 insertions(+), 35 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
index 81cf784..3077f18 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
@@ -21,6 +21,7 @@ package org.apache.flink.client.program;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
@@ -94,8 +95,8 @@ public class MiniClusterClient extends 
ClusterClient<MiniClusterClient.MiniClust
 
                        try {
                                return 
jobResult.toJobExecutionResult(classLoader);
-                       } catch (JobResult.WrappedJobException e) {
-                               throw new ProgramInvocationException("Job 
failed", jobGraph.getJobID(), e.getCause());
+                       } catch (JobExecutionException e) {
+                               throw new ProgramInvocationException("Job 
failed", jobGraph.getJobID(), e);
                        } catch (IOException | ClassNotFoundException e) {
                                throw new ProgramInvocationException("Job 
failed", jobGraph.getJobID(), e);
                        }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 935a07f..86cc52d 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;
 import org.apache.flink.client.program.rest.retry.WaitStrategy;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
@@ -263,8 +264,8 @@ public class RestClusterClient<T> extends ClusterClient<T> 
implements NewCluster
                        try {
                                this.lastJobExecutionResult = 
jobResult.toJobExecutionResult(classLoader);
                                return lastJobExecutionResult;
-                       } catch (JobResult.WrappedJobException we) {
-                               throw new ProgramInvocationException("Job 
failed.", jobGraph.getJobID(), we.getCause());
+                       } catch (JobExecutionException e) {
+                               throw new ProgramInvocationException("Job 
failed.", jobGraph.getJobID(), e);
                        } catch (IOException | ClassNotFoundException e) {
                                throw new ProgramInvocationException("Job 
failed.", jobGraph.getJobID(), e);
                        }
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 75f16c0..abe59d3 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
@@ -122,6 +123,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -229,6 +231,7 @@ public class RestClusterClientTest extends TestLogger {
                TestJobExecutionResultHandler testJobExecutionResultHandler =
                        new TestJobExecutionResultHandler(
                                JobExecutionResultResponseBody.created(new 
JobResult.Builder()
+                                       
.applicationStatus(ApplicationStatus.SUCCEEDED)
                                        .jobId(jobId)
                                        .netRuntime(Long.MAX_VALUE)
                                        .build()));
@@ -351,11 +354,13 @@ public class RestClusterClientTest extends TestLogger {
                                new RestHandlerException("should trigger 
retry", HttpResponseStatus.SERVICE_UNAVAILABLE),
                                JobExecutionResultResponseBody.inProgress(),
                                JobExecutionResultResponseBody.created(new 
JobResult.Builder()
+                                       
.applicationStatus(ApplicationStatus.SUCCEEDED)
                                        .jobId(jobId)
                                        .netRuntime(Long.MAX_VALUE)
                                        
.accumulatorResults(Collections.singletonMap("testName", new 
SerializedValue<>(OptionalFailure.of(1.0))))
                                        .build()),
                                JobExecutionResultResponseBody.created(new 
JobResult.Builder()
+                                       
.applicationStatus(ApplicationStatus.FAILED)
                                        .jobId(jobId)
                                        .netRuntime(Long.MAX_VALUE)
                                        .serializedThrowable(new 
SerializedThrowable(new RuntimeException("expected")))
@@ -385,8 +390,10 @@ public class RestClusterClientTest extends TestLogger {
                                restClusterClient.submitJob(jobGraph, 
ClassLoader.getSystemClassLoader());
                                fail("Expected exception not thrown.");
                        } catch (final ProgramInvocationException e) {
-                               assertThat(e.getCause(), 
instanceOf(RuntimeException.class));
-                               assertThat(e.getCause().getMessage(), 
equalTo("expected"));
+                               final Optional<RuntimeException> cause = 
ExceptionUtils.findThrowable(e, RuntimeException.class);
+
+                               assertThat(cause.isPresent(), is(true));
+                               assertThat(cause.get().getMessage(), 
equalTo("expected"));
                        }
                }
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
index 60ddbe3..eb7c473 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
@@ -22,11 +22,13 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.runtime.client.JobCancellationException;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.SerializedValue;
@@ -54,6 +56,8 @@ public class JobResult implements Serializable {
 
        private final JobID jobId;
 
+       private final ApplicationStatus applicationStatus;
+
        private final Map<String, SerializedValue<OptionalFailure<Object>>> 
accumulatorResults;
 
        private final long netRuntime;
@@ -64,6 +68,7 @@ public class JobResult implements Serializable {
 
        private JobResult(
                        final JobID jobId,
+                       final ApplicationStatus applicationStatus,
                        final Map<String, 
SerializedValue<OptionalFailure<Object>>> accumulatorResults,
                        final long netRuntime,
                        @Nullable final SerializedThrowable 
serializedThrowable) {
@@ -71,6 +76,7 @@ public class JobResult implements Serializable {
                checkArgument(netRuntime >= 0, "netRuntime must be greater than 
or equals 0");
 
                this.jobId = requireNonNull(jobId);
+               this.applicationStatus = requireNonNull(applicationStatus);
                this.accumulatorResults = requireNonNull(accumulatorResults);
                this.netRuntime = netRuntime;
                this.serializedThrowable = serializedThrowable;
@@ -80,13 +86,17 @@ public class JobResult implements Serializable {
         * Returns {@code true} if the job finished successfully.
         */
        public boolean isSuccess() {
-               return serializedThrowable == null;
+               return applicationStatus == ApplicationStatus.SUCCEEDED || 
(applicationStatus == ApplicationStatus.UNKNOWN && serializedThrowable == null);
        }
 
        public JobID getJobId() {
                return jobId;
        }
 
+       public ApplicationStatus getApplicationStatus() {
+               return applicationStatus;
+       }
+
        public Map<String, SerializedValue<OptionalFailure<Object>>> 
getAccumulatorResults() {
                return accumulatorResults;
        }
@@ -108,22 +118,40 @@ public class JobResult implements Serializable {
         *
         * @param classLoader to use for deserialization
         * @return JobExecutionResult
-        * @throws WrappedJobException if the JobResult contains a serialized 
exception
+        * @throws JobCancellationException if the job was cancelled
+        * @throws JobExecutionException if the job execution did not succeed
         * @throws IOException if the accumulator could not be deserialized
         * @throws ClassNotFoundException if the accumulator could not 
deserialized
         */
-       public JobExecutionResult toJobExecutionResult(ClassLoader classLoader) 
throws WrappedJobException, IOException, ClassNotFoundException {
-               if (serializedThrowable != null) {
-                       final Throwable throwable = 
serializedThrowable.deserializeError(classLoader);
-                       throw new WrappedJobException(throwable);
-               }
+       public JobExecutionResult toJobExecutionResult(ClassLoader classLoader) 
throws JobExecutionException, IOException, ClassNotFoundException {
+               if (applicationStatus == ApplicationStatus.SUCCEEDED) {
+                       return new JobExecutionResult(
+                               jobId,
+                               netRuntime,
+                               AccumulatorHelper.deserializeAccumulators(
+                                       accumulatorResults,
+                                       classLoader));
+               } else {
+                       final Throwable cause;
+
+                       if (serializedThrowable == null) {
+                               cause = null;
+                       } else {
+                               cause = 
serializedThrowable.deserializeError(classLoader);
+                       }
+
+                       final JobExecutionException exception;
+
+                       if (applicationStatus == ApplicationStatus.FAILED) {
+                               exception = new JobExecutionException(jobId, 
"Job execution failed.", cause);
+                       } else if (applicationStatus == 
ApplicationStatus.CANCELED) {
+                               exception = new JobCancellationException(jobId, 
"Job was cancelled.", cause);
+                       } else {
+                               exception = new JobExecutionException(jobId, 
"Job completed with illegal application status: " + applicationStatus + '.', 
cause);
+                       }
 
-               return new JobExecutionResult(
-                       jobId,
-                       netRuntime,
-                       AccumulatorHelper.deserializeAccumulators(
-                               accumulatorResults,
-                               classLoader));
+                       throw exception;
+               }
        }
 
        /**
@@ -134,6 +162,8 @@ public class JobResult implements Serializable {
 
                private JobID jobId;
 
+               private ApplicationStatus applicationStatus = 
ApplicationStatus.UNKNOWN;
+
                private Map<String, SerializedValue<OptionalFailure<Object>>> 
accumulatorResults;
 
                private long netRuntime = -1;
@@ -145,6 +175,11 @@ public class JobResult implements Serializable {
                        return this;
                }
 
+               public Builder applicationStatus(final ApplicationStatus 
applicationStatus) {
+                       this.applicationStatus = applicationStatus;
+                       return this;
+               }
+
                public Builder accumulatorResults(final Map<String, 
SerializedValue<OptionalFailure<Object>>> accumulatorResults) {
                        this.accumulatorResults = accumulatorResults;
                        return this;
@@ -163,6 +198,7 @@ public class JobResult implements Serializable {
                public JobResult build() {
                        return new JobResult(
                                jobId,
+                               applicationStatus,
                                accumulatorResults == null ? 
Collections.emptyMap() : accumulatorResults,
                                netRuntime,
                                serializedThrowable);
@@ -188,6 +224,8 @@ public class JobResult implements Serializable {
                final JobResult.Builder builder = new JobResult.Builder();
                builder.jobId(jobId);
 
+               
builder.applicationStatus(ApplicationStatus.fromJobStatus(accessExecutionGraph.getState()));
+
                final long netRuntime = 
accessExecutionGraph.getStatusTimestamp(jobStatus) - 
accessExecutionGraph.getStatusTimestamp(JobStatus.CREATED);
                // guard against clock changes
                final long guardedNetRuntime = Math.max(netRuntime, 0L);
@@ -204,17 +242,4 @@ public class JobResult implements Serializable {
 
                return builder.build();
        }
-
-       /**
-        * Exception which indicates that the job has finished with an {@link 
Exception}.
-        */
-       public static final class WrappedJobException extends FlinkException {
-
-               private static final long serialVersionUID = 
6535061898650156019L;
-
-               public WrappedJobException(Throwable cause) {
-                       super(cause);
-               }
-       }
-
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 97ab5a5..065dbdc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -619,8 +619,6 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
 
                try {
                        return 
jobResult.toJobExecutionResult(Thread.currentThread().getContextClassLoader());
-               } catch (JobResult.WrappedJobException e) {
-                       throw new JobExecutionException(job.getJobID(), 
e.getCause());
                } catch (IOException | ClassNotFoundException e) {
                        throw new JobExecutionException(job.getJobID(), e);
                }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java
index e568f47..8342eb3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.rest.messages.json;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.SerializedThrowable;
@@ -68,6 +69,7 @@ public class JobResultDeserializer extends 
StdDeserializer<JobResult> {
        @Override
        public JobResult deserialize(final JsonParser p, final 
DeserializationContext ctxt) throws IOException {
                JobID jobId = null;
+               ApplicationStatus applicationStatus = ApplicationStatus.UNKNOWN;
                long netRuntime = -1;
                SerializedThrowable serializedThrowable = null;
                Map<String, SerializedValue<OptionalFailure<Object>>> 
accumulatorResults = null;
@@ -85,6 +87,10 @@ public class JobResultDeserializer extends 
StdDeserializer<JobResult> {
                                        assertNextToken(p, 
JsonToken.VALUE_STRING);
                                        jobId = 
jobIdDeserializer.deserialize(p, ctxt);
                                        break;
+                               case 
JobResultSerializer.FIELD_NAME_APPLICATION_STATUS:
+                                       assertNextToken(p, 
JsonToken.VALUE_STRING);
+                                       applicationStatus = 
ApplicationStatus.valueOf(p.getValueAsString().toUpperCase());
+                                       break;
                                case JobResultSerializer.FIELD_NAME_NET_RUNTIME:
                                        assertNextToken(p, 
JsonToken.VALUE_NUMBER_INT);
                                        netRuntime = p.getLongValue();
@@ -105,6 +111,7 @@ public class JobResultDeserializer extends 
StdDeserializer<JobResult> {
                try {
                        return new JobResult.Builder()
                                .jobId(jobId)
+                               .applicationStatus(applicationStatus)
                                .netRuntime(netRuntime)
                                .accumulatorResults(accumulatorResults)
                                .serializedThrowable(serializedThrowable)
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java
index 694fa2f..cdf3541 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java
@@ -44,6 +44,8 @@ public class JobResultSerializer extends 
StdSerializer<JobResult> {
 
        static final String FIELD_NAME_JOB_ID = "id";
 
+       static final String FIELD_NAME_APPLICATION_STATUS = 
"application-status";
+
        static final String FIELD_NAME_NET_RUNTIME = "net-runtime";
 
        static final String FIELD_NAME_ACCUMULATOR_RESULTS = 
"accumulator-results";
@@ -76,6 +78,9 @@ public class JobResultSerializer extends 
StdSerializer<JobResult> {
                gen.writeFieldName(FIELD_NAME_JOB_ID);
                jobIdSerializer.serialize(result.getJobId(), gen, provider);
 
+               gen.writeFieldName(FIELD_NAME_APPLICATION_STATUS);
+               gen.writeString(result.getApplicationStatus().name());
+
                gen.writeFieldName(FIELD_NAME_ACCUMULATOR_RESULTS);
                gen.writeStartObject();
                final Map<String, SerializedValue<OptionalFailure<Object>>> 
accumulatorResults = result.getAccumulatorResults();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobResultTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobResultTest.java
index 84c9da5..6543fa2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobResultTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobResultTest.java
@@ -19,12 +19,19 @@
 package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.client.JobCancellationException;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
@@ -66,4 +73,62 @@ public class JobResultTest extends TestLogger {
                assertThat(jobResult.isSuccess(), equalTo(true));
        }
 
+       @Test
+       public void testCancelledJobIsFailureResult() {
+               final JobResult jobResult = JobResult.createFrom(
+                       new ArchivedExecutionGraphBuilder()
+                               .setJobID(new JobID())
+                               .setState(JobStatus.CANCELED)
+                               .build());
+
+               assertThat(jobResult.isSuccess(), is(false));
+       }
+
+       @Test
+       public void testFailedJobIsFailureResult() {
+               final JobResult jobResult = JobResult.createFrom(
+                       new ArchivedExecutionGraphBuilder()
+                               .setJobID(new JobID())
+                               .setState(JobStatus.FAILED)
+                               .setFailureCause(new ErrorInfo(new 
FlinkException("Test exception"), 42L))
+                               .build());
+
+               assertThat(jobResult.isSuccess(), is(false));
+       }
+
+       @Test
+       public void testCancelledJobThrowsJobCancellationException() throws 
Exception {
+               final FlinkException cause = new FlinkException("Test 
exception");
+               final JobResult jobResult = JobResult.createFrom(
+                       new ArchivedExecutionGraphBuilder()
+                               .setJobID(new JobID())
+                               .setState(JobStatus.CANCELED)
+                               .setFailureCause(new ErrorInfo(cause, 42L))
+                               .build());
+
+               try {
+                       
jobResult.toJobExecutionResult(getClass().getClassLoader());
+                       fail("Job should fail with an 
JobCancellationException.");
+               } catch (JobCancellationException expected) {
+                       assertThat(expected.getCause(), is(equalTo(cause)));
+               }
+       }
+
+       @Test
+       public void testFailedJobThrowsJobExecutionException() throws Exception 
{
+               final FlinkException cause = new FlinkException("Test 
exception");
+               final JobResult jobResult = JobResult.createFrom(
+                       new ArchivedExecutionGraphBuilder()
+                               .setJobID(new JobID())
+                               .setState(JobStatus.FAILED)
+                               .setFailureCause(new ErrorInfo(cause, 42L))
+                               .build());
+
+               try {
+                       
jobResult.toJobExecutionResult(getClass().getClassLoader());
+                       fail("Job should fail with JobExecutionException.");
+               } catch (JobExecutionException expected) {
+                       assertThat(expected.getCause(), is(equalTo(cause)));
+               }
+       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBodyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBodyTest.java
index 9534d2b..c8cc7f3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBodyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBodyTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.rest.messages.job;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
 import org.apache.flink.util.OptionalFailure;
@@ -64,12 +65,14 @@ public class JobExecutionResultResponseBodyTest
                return Arrays.asList(new Object[][] {
                        {JobExecutionResultResponseBody.created(new 
JobResult.Builder()
                                .jobId(TEST_JOB_ID)
+                               .applicationStatus(ApplicationStatus.SUCCEEDED)
                                .netRuntime(TEST_NET_RUNTIME)
                                .accumulatorResults(TEST_ACCUMULATORS)
                                .serializedThrowable(new 
SerializedThrowable(new RuntimeException("expected")))
                                .build())},
                        {JobExecutionResultResponseBody.created(new 
JobResult.Builder()
                                .jobId(TEST_JOB_ID)
+                               .applicationStatus(ApplicationStatus.FAILED)
                                .netRuntime(TEST_NET_RUNTIME)
                                .accumulatorResults(TEST_ACCUMULATORS)
                                .build())},
@@ -108,6 +111,7 @@ public class JobExecutionResultResponseBodyTest
                        assertNotNull(actualJobExecutionResult);
 
                        assertThat(actualJobExecutionResult.getJobId(), 
equalTo(expectedJobExecutionResult.getJobId()));
+                       
assertThat(actualJobExecutionResult.getApplicationStatus(), 
equalTo(expectedJobExecutionResult.getApplicationStatus()));
                        assertThat(actualJobExecutionResult.getNetRuntime(), 
equalTo(expectedJobExecutionResult.getNetRuntime()));
                        
assertThat(actualJobExecutionResult.getAccumulatorResults(), 
equalTo(expectedJobExecutionResult.getAccumulatorResults()));
 

Reply via email to