This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push: new 40b0a31 [FLINK-24340] Only print exception on job failure/suspension 40b0a31 is described below commit 40b0a317a17da3f1cd0b06a421edb6a050c63911 Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Tue Sep 21 15:34:48 2021 +0200 [FLINK-24340] Only print exception on job failure/suspension --- .../apache/flink/runtime/dispatcher/Dispatcher.java | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index e1bb919..e08fe58 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -843,28 +843,36 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher protected CleanupJobState jobReachedTerminalState(ExecutionGraphInfo executionGraphInfo) { final ArchivedExecutionGraph archivedExecutionGraph = executionGraphInfo.getArchivedExecutionGraph(); + final JobStatus terminalJobStatus = archivedExecutionGraph.getState(); Preconditions.checkArgument( - archivedExecutionGraph.getState().isTerminalState(), + terminalJobStatus.isTerminalState(), "Job %s is in state %s which is not terminal.", archivedExecutionGraph.getJobID(), - archivedExecutionGraph.getState()); + terminalJobStatus); - if (archivedExecutionGraph.getFailureInfo() != null) { + // the failureInfo contains the reason for why job was failed/suspended, but for + // finished/canceled jobs it may contain the last cause of a restart (if there were any) + // for finished/canceled jobs we don't want to print it because it is misleading + final boolean isFailureInfoRelatedToJobTermination = + terminalJobStatus == JobStatus.SUSPENDED || terminalJobStatus == JobStatus.FAILED; + + if (archivedExecutionGraph.getFailureInfo() != null + && isFailureInfoRelatedToJobTermination) { log.info( "Job {} reached terminal state {}.\n{}", archivedExecutionGraph.getJobID(), - archivedExecutionGraph.getState(), + terminalJobStatus, archivedExecutionGraph.getFailureInfo().getExceptionAsString().trim()); } else { log.info( "Job {} reached terminal state {}.", archivedExecutionGraph.getJobID(), - archivedExecutionGraph.getState()); + terminalJobStatus); } archiveExecutionGraph(executionGraphInfo); - return archivedExecutionGraph.getState().isGloballyTerminalState() + return terminalJobStatus.isGloballyTerminalState() ? CleanupJobState.GLOBAL : CleanupJobState.LOCAL; }