This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch client-exception in repository https://gitbox.apache.org/repos/asf/flink.git
commit 498033af4d323340a4a4d629cbd1be2d0379ce32 Author: Kostas Kloudas <[email protected]> AuthorDate: Thu Nov 12 08:55:17 2020 +0100 comments --- .../deployment/ClusterClientJobClientAdapter.java | 4 +- .../ApplicationDispatcherBootstrap.java | 10 +++-- .../deployment/application/EmbeddedJobClient.java | 4 +- .../client/JobExecutionResultException.java | 46 +--------------------- .../apache/flink/runtime/jobmaster/JobResult.java | 43 +++++++++++--------- .../runtime/minicluster/MiniClusterJobClient.java | 4 +- .../flink/runtime/jobmaster/JobResultTest.java | 15 +++++-- 7 files changed, 47 insertions(+), 79 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java index 159aaf9..1c6d8af 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java @@ -24,7 +24,6 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.ClusterClientProvider; import org.apache.flink.core.execution.JobClient; -import org.apache.flink.runtime.client.JobExecutionResultException; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.coordination.CoordinationRequest; @@ -116,8 +115,7 @@ public class ClusterClientJobClientAdapter<ClusterID> implements JobClient, Coor try { return jobResult.toJobExecutionResult(classLoader); } catch (Throwable t) { - throw new CompletionException( - JobExecutionResultException.fromJobResult(jobResult, classLoader, t)); + throw new CompletionException(t); } }))); } diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java index 323fb6a..1aaf638 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java @@ -284,9 +284,13 @@ public class ApplicationDispatcherBootstrap implements DispatcherBootstrap { return result; } - throw new CompletionException( - JobExecutionResultException.fromJobResult( - result, application.getUserCodeClassLoader())); + final ClassLoader userClassLoader = application.getUserCodeClassLoader(); + try { + result.toJobExecutionResult(userClassLoader); + throw new IllegalStateException("No exception thrown although the job execution was not successful."); + } catch (Throwable t) { + throw new CompletionException(t); + } }); } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/EmbeddedJobClient.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/EmbeddedJobClient.java index dbfec71..4787324 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/EmbeddedJobClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/EmbeddedJobClient.java @@ -25,7 +25,6 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.api.common.time.Time; import org.apache.flink.core.execution.JobClient; -import org.apache.flink.runtime.client.JobExecutionResultException; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.dispatcher.DispatcherGateway; @@ -127,8 +126,7 @@ public class EmbeddedJobClient implements JobClient, CoordinationRequestGateway try { return jobResult.toJobExecutionResult(classLoader); } catch (Throwable t) { - throw new CompletionException( - JobExecutionResultException.fromJobResult(jobResult, classLoader, t)); + throw new CompletionException(t); } }); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionResultException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionResultException.java index 869e38a..78738a6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionResultException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionResultException.java @@ -20,14 +20,14 @@ package org.apache.flink.runtime.client; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.clusterframework.ApplicationStatus; -import org.apache.flink.runtime.jobmaster.JobResult; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; /** * This exception is the base exception for all exceptions that signals the failure of an * application with a given {@link ApplicationStatus}. + * + * * @throws JobCancellationException if the job was cancelled */ public class JobExecutionResultException extends JobExecutionException { @@ -45,46 +45,4 @@ public class JobExecutionResultException extends JobExecutionException { public ApplicationStatus getStatus() { return status; } - - public static JobExecutionResultException fromJobResult( - final JobResult result, - final ClassLoader userClassLoader) { - return fromJobResult(result, userClassLoader, true, null); - } - - public static JobExecutionResultException fromJobResult( - final JobResult result, - final ClassLoader userClassLoader, - final Throwable throwable) { - return fromJobResult(result, userClassLoader, false, throwable); - } - - private static JobExecutionResultException fromJobResult( - final JobResult result, - final ClassLoader userClassLoader, - final boolean isUnwrapped, - Throwable throwable) { - - checkState(result != null && !result.isSuccess()); - checkNotNull(userClassLoader); - - if (isUnwrapped) { - // We do this to uniformize the behavior of the "ATTACHED" and "DETACHED" - // in application mode, while maintaining the expected exceptions thrown in case - // of a failed job execution. - try { - result.toJobExecutionResult(userClassLoader); - throw new IllegalStateException("No exception thrown although the job execution was not successful."); - } catch (Throwable t) { - throwable = t; - } - } - - final JobID jobID = result.getJobId(); - final ApplicationStatus status = result.getApplicationStatus(); - - return status == ApplicationStatus.CANCELED || status == ApplicationStatus.FAILED - ? new JobExecutionResultException(jobID, status, "Application Status: " + status.name(), throwable) - : new JobExecutionResultException(jobID, ApplicationStatus.UNKNOWN, "Job failed for unknown reason.", throwable); - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java index 9688919..5f8171e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.runtime.client.JobCancellationException; import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.client.JobExecutionResultException; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.dispatcher.Dispatcher; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; @@ -119,12 +120,11 @@ public class JobResult implements Serializable { * * @param classLoader to use for deserialization * @return JobExecutionResult - * @throws JobCancellationException if the job was cancelled - * @throws JobExecutionException if the job execution did not succeed + * @throws JobExecutionResultException if the job execution did not succeed * @throws IOException if the accumulator could not be deserialized * @throws ClassNotFoundException if the accumulator could not deserialized */ - public JobExecutionResult toJobExecutionResult(ClassLoader classLoader) throws JobExecutionException, IOException, ClassNotFoundException { + public JobExecutionResult toJobExecutionResult(ClassLoader classLoader) throws JobExecutionResultException, IOException, ClassNotFoundException { if (applicationStatus == ApplicationStatus.SUCCEEDED) { return new JobExecutionResult( jobId, @@ -133,26 +133,31 @@ public class JobResult implements Serializable { accumulatorResults, classLoader)); } else { - final Throwable cause; + final JobExecutionException exception = + getJobExecutionException(serializedThrowable, classLoader); - if (serializedThrowable == null) { - cause = null; - } else { - cause = serializedThrowable.deserializeError(classLoader); - } - - final JobExecutionException exception; + throw applicationStatus == ApplicationStatus.CANCELED || applicationStatus == ApplicationStatus.FAILED + ? new JobExecutionResultException(jobId, applicationStatus, "Application Status: " + applicationStatus.name(), exception) + : new JobExecutionResultException(jobId, ApplicationStatus.UNKNOWN, "Job failed for unknown reason.", exception); + } + } - if (applicationStatus == ApplicationStatus.FAILED) { - exception = new JobExecutionException(jobId, "Job execution failed.", cause); - } else if (applicationStatus == ApplicationStatus.CANCELED) { - exception = new JobCancellationException(jobId, "Job was cancelled.", cause); - } else { - exception = new JobExecutionException(jobId, "Job completed with illegal application status: " + applicationStatus + '.', cause); - } + private JobExecutionException getJobExecutionException(final SerializedThrowable throwable, final ClassLoader classLoader) { + final Throwable cause = getCause(throwable, classLoader); - throw exception; + final JobExecutionException exception; + if (applicationStatus == ApplicationStatus.FAILED) { + exception = new JobExecutionException(jobId, "Job execution failed.", cause); + } else if (applicationStatus == ApplicationStatus.CANCELED) { + exception = new JobCancellationException(jobId, "Job was cancelled.", cause); + } else { + exception = new JobExecutionException(jobId, "Job completed with illegal application status: " + applicationStatus + '.', cause); } + return exception; + } + + private static Throwable getCause(final SerializedThrowable throwable, final ClassLoader classLoader) { + return throwable == null ? null : throwable.deserializeError(classLoader); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobClient.java index 9c43736..b8d1d90 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobClient.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.core.execution.JobClient; -import org.apache.flink.runtime.client.JobExecutionResultException; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobmaster.JobResult; @@ -119,8 +118,7 @@ public final class MiniClusterJobClient implements JobClient, CoordinationReques try { return result.toJobExecutionResult(classLoader); } catch (Exception e) { - throw new CompletionException( - JobExecutionResultException.fromJobResult(result, classLoader, e)); + throw new CompletionException(e); } }); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobResultTest.java index a393178..e87bd48 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobResultTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobResultTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.runtime.client.JobCancellationException; import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.client.JobExecutionResultException; import org.apache.flink.runtime.executiongraph.ErrorInfo; import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; import org.apache.flink.util.FlinkException; @@ -31,6 +32,7 @@ import org.apache.flink.util.TestLogger; import org.junit.Test; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.core.IsNull.nullValue; import static org.junit.Assert.assertThat; @@ -110,9 +112,11 @@ public class JobResultTest extends TestLogger { try { jobResult.toJobExecutionResult(getClass().getClassLoader()); fail("Job should fail with an JobCancellationException."); - } catch (JobCancellationException expected) { + } catch (JobExecutionResultException expected) { + + assertThat(expected.getCause(), is(instanceOf(JobCancellationException.class))); // the failure cause in the execution graph should not be the cause of the canceled job result - assertThat(expected.getCause(), is(nullValue())); + assertThat(expected.getCause().getCause(), is(nullValue())); } } @@ -129,8 +133,11 @@ public class JobResultTest extends TestLogger { try { jobResult.toJobExecutionResult(getClass().getClassLoader()); fail("Job should fail with JobExecutionException."); - } catch (JobExecutionException expected) { - assertThat(expected.getCause(), is(equalTo(cause))); + } catch (JobExecutionResultException expected) { + + assertThat(expected.getCause(), is(instanceOf(JobExecutionException.class))); + // the failure cause in the execution graph should not be the cause of the canceled job result + assertThat(expected.getCause().getCause(), is(equalTo(cause))); } }
