This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch client-exception
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 498033af4d323340a4a4d629cbd1be2d0379ce32
Author: Kostas Kloudas <[email protected]>
AuthorDate: Thu Nov 12 08:55:17 2020 +0100

    comments
---
 .../deployment/ClusterClientJobClientAdapter.java  |  4 +-
 .../ApplicationDispatcherBootstrap.java            | 10 +++--
 .../deployment/application/EmbeddedJobClient.java  |  4 +-
 .../client/JobExecutionResultException.java        | 46 +---------------------
 .../apache/flink/runtime/jobmaster/JobResult.java  | 43 +++++++++++---------
 .../runtime/minicluster/MiniClusterJobClient.java  |  4 +-
 .../flink/runtime/jobmaster/JobResultTest.java     | 15 +++++--
 7 files changed, 47 insertions(+), 79 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
index 159aaf9..1c6d8af 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ClusterClientProvider;
 import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.runtime.client.JobExecutionResultException;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
@@ -116,8 +115,7 @@ public class ClusterClientJobClientAdapter<ClusterID> 
implements JobClient, Coor
                                                try {
                                                        return 
jobResult.toJobExecutionResult(classLoader);
                                                } catch (Throwable t) {
-                                                       throw new 
CompletionException(
-                                                                       
JobExecutionResultException.fromJobResult(jobResult, classLoader, t));
+                                                       throw new 
CompletionException(t);
                                                }
                                        })));
        }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
index 323fb6a..1aaf638 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
@@ -284,9 +284,13 @@ public class ApplicationDispatcherBootstrap implements 
DispatcherBootstrap {
                                return result;
                        }
 
-                       throw new CompletionException(
-                                       
JobExecutionResultException.fromJobResult(
-                                                       result, 
application.getUserCodeClassLoader()));
+                       final ClassLoader userClassLoader = 
application.getUserCodeClassLoader();
+                       try {
+                               result.toJobExecutionResult(userClassLoader);
+                               throw new IllegalStateException("No exception 
thrown although the job execution was not successful.");
+                       } catch (Throwable t) {
+                               throw new CompletionException(t);
+                       }
                });
        }
 }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/EmbeddedJobClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/EmbeddedJobClient.java
index dbfec71..4787324 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/EmbeddedJobClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/EmbeddedJobClient.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.runtime.client.JobExecutionResultException;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
@@ -127,8 +126,7 @@ public class EmbeddedJobClient implements JobClient, 
CoordinationRequestGateway
                                        try {
                                                return 
jobResult.toJobExecutionResult(classLoader);
                                        } catch (Throwable t) {
-                                               throw new CompletionException(
-                                                               
JobExecutionResultException.fromJobResult(jobResult, classLoader, t));
+                                               throw new 
CompletionException(t);
                                        }
                                });
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionResultException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionResultException.java
index 869e38a..78738a6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionResultException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionResultException.java
@@ -20,14 +20,14 @@ package org.apache.flink.runtime.client;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.jobmaster.JobResult;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * This exception is the base exception for all exceptions that signals the 
failure of an
  * application with a given {@link ApplicationStatus}.
+ *
+ *      * @throws JobCancellationException if the job was cancelled
  */
 public class JobExecutionResultException extends JobExecutionException {
 
@@ -45,46 +45,4 @@ public class JobExecutionResultException extends 
JobExecutionException {
        public ApplicationStatus getStatus() {
                return status;
        }
-
-       public static JobExecutionResultException fromJobResult(
-                       final JobResult result,
-                       final ClassLoader userClassLoader) {
-               return fromJobResult(result, userClassLoader, true, null);
-       }
-
-       public static JobExecutionResultException fromJobResult(
-                       final JobResult result,
-                       final ClassLoader userClassLoader,
-                       final Throwable throwable) {
-               return fromJobResult(result, userClassLoader, false, throwable);
-       }
-
-       private static JobExecutionResultException fromJobResult(
-                       final JobResult result,
-                       final ClassLoader userClassLoader,
-                       final boolean isUnwrapped,
-                       Throwable throwable) {
-
-               checkState(result != null && !result.isSuccess());
-               checkNotNull(userClassLoader);
-
-               if (isUnwrapped) {
-                       // We do this to uniformize the behavior of the 
"ATTACHED" and "DETACHED"
-                       // in application mode, while maintaining the expected 
exceptions thrown in case
-                       // of a failed job execution.
-                       try {
-                               result.toJobExecutionResult(userClassLoader);
-                               throw new IllegalStateException("No exception 
thrown although the job execution was not successful.");
-                       } catch (Throwable t) {
-                               throwable = t;
-                       }
-               }
-
-               final JobID jobID = result.getJobId();
-               final ApplicationStatus status = result.getApplicationStatus();
-
-               return status == ApplicationStatus.CANCELED || status == 
ApplicationStatus.FAILED
-                               ? new JobExecutionResultException(jobID, 
status, "Application Status: " + status.name(), throwable)
-                               : new JobExecutionResultException(jobID, 
ApplicationStatus.UNKNOWN, "Job failed for unknown reason.", throwable);
-       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
index 9688919..5f8171e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.runtime.client.JobCancellationException;
 import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.client.JobExecutionResultException;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
@@ -119,12 +120,11 @@ public class JobResult implements Serializable {
         *
         * @param classLoader to use for deserialization
         * @return JobExecutionResult
-        * @throws JobCancellationException if the job was cancelled
-        * @throws JobExecutionException if the job execution did not succeed
+        * @throws JobExecutionResultException if the job execution did not 
succeed
         * @throws IOException if the accumulator could not be deserialized
         * @throws ClassNotFoundException if the accumulator could not 
deserialized
         */
-       public JobExecutionResult toJobExecutionResult(ClassLoader classLoader) 
throws JobExecutionException, IOException, ClassNotFoundException {
+       public JobExecutionResult toJobExecutionResult(ClassLoader classLoader) 
throws JobExecutionResultException, IOException, ClassNotFoundException {
                if (applicationStatus == ApplicationStatus.SUCCEEDED) {
                        return new JobExecutionResult(
                                jobId,
@@ -133,26 +133,31 @@ public class JobResult implements Serializable {
                                        accumulatorResults,
                                        classLoader));
                } else {
-                       final Throwable cause;
+                       final JobExecutionException exception =
+                                       
getJobExecutionException(serializedThrowable, classLoader);
 
-                       if (serializedThrowable == null) {
-                               cause = null;
-                       } else {
-                               cause = 
serializedThrowable.deserializeError(classLoader);
-                       }
-
-                       final JobExecutionException exception;
+                       throw applicationStatus == ApplicationStatus.CANCELED 
|| applicationStatus == ApplicationStatus.FAILED
+                                       ? new 
JobExecutionResultException(jobId, applicationStatus, "Application Status: " + 
applicationStatus.name(), exception)
+                                       : new 
JobExecutionResultException(jobId, ApplicationStatus.UNKNOWN, "Job failed for 
unknown reason.", exception);
+               }
+       }
 
-                       if (applicationStatus == ApplicationStatus.FAILED) {
-                               exception = new JobExecutionException(jobId, 
"Job execution failed.", cause);
-                       } else if (applicationStatus == 
ApplicationStatus.CANCELED) {
-                               exception = new JobCancellationException(jobId, 
"Job was cancelled.", cause);
-                       } else {
-                               exception = new JobExecutionException(jobId, 
"Job completed with illegal application status: " + applicationStatus + '.', 
cause);
-                       }
+       private JobExecutionException getJobExecutionException(final 
SerializedThrowable throwable, final ClassLoader classLoader) {
+               final Throwable cause = getCause(throwable, classLoader);
 
-                       throw exception;
+               final JobExecutionException exception;
+               if (applicationStatus == ApplicationStatus.FAILED) {
+                       exception = new JobExecutionException(jobId, "Job 
execution failed.", cause);
+               } else if (applicationStatus == ApplicationStatus.CANCELED) {
+                       exception = new JobCancellationException(jobId, "Job 
was cancelled.", cause);
+               } else {
+                       exception = new JobExecutionException(jobId, "Job 
completed with illegal application status: " + applicationStatus + '.', cause);
                }
+               return exception;
+       }
+
+       private static Throwable getCause(final SerializedThrowable throwable, 
final ClassLoader classLoader) {
+               return throwable == null ? null : 
throwable.deserializeError(classLoader);
        }
 
        /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobClient.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobClient.java
index 9c43736..b8d1d90 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobClient.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobClient.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.runtime.client.JobExecutionResultException;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmaster.JobResult;
@@ -119,8 +118,7 @@ public final class MiniClusterJobClient implements 
JobClient, CoordinationReques
                        try {
                                return result.toJobExecutionResult(classLoader);
                        } catch (Exception e) {
-                               throw new CompletionException(
-                                               
JobExecutionResultException.fromJobResult(result, classLoader, e));
+                               throw new CompletionException(e);
                        }
                });
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobResultTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobResultTest.java
index a393178..e87bd48 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobResultTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobResultTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.client.JobCancellationException;
 import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.client.JobExecutionResultException;
 import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
 import org.apache.flink.util.FlinkException;
@@ -31,6 +32,7 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.core.IsNull.nullValue;
 import static org.junit.Assert.assertThat;
@@ -110,9 +112,11 @@ public class JobResultTest extends TestLogger {
                try {
                        
jobResult.toJobExecutionResult(getClass().getClassLoader());
                        fail("Job should fail with an 
JobCancellationException.");
-               } catch (JobCancellationException expected) {
+               } catch (JobExecutionResultException expected) {
+
+                       assertThat(expected.getCause(), 
is(instanceOf(JobCancellationException.class)));
                        // the failure cause in the execution graph should not 
be the cause of the canceled job result
-                       assertThat(expected.getCause(), is(nullValue()));
+                       assertThat(expected.getCause().getCause(), 
is(nullValue()));
                }
        }
 
@@ -129,8 +133,11 @@ public class JobResultTest extends TestLogger {
                try {
                        
jobResult.toJobExecutionResult(getClass().getClassLoader());
                        fail("Job should fail with JobExecutionException.");
-               } catch (JobExecutionException expected) {
-                       assertThat(expected.getCause(), is(equalTo(cause)));
+               } catch (JobExecutionResultException expected) {
+
+                       assertThat(expected.getCause(), 
is(instanceOf(JobExecutionException.class)));
+                       // the failure cause in the execution graph should not 
be the cause of the canceled job result
+                       assertThat(expected.getCause().getCause(), 
is(equalTo(cause)));
                }
        }
 

Reply via email to