This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 40b0a31  [FLINK-24340] Only print exception on job failure/suspension
40b0a31 is described below

commit 40b0a317a17da3f1cd0b06a421edb6a050c63911
Author: Chesnay Schepler <ches...@apache.org>
AuthorDate: Tue Sep 21 15:34:48 2021 +0200

    [FLINK-24340] Only print exception on job failure/suspension
---
 .../apache/flink/runtime/dispatcher/Dispatcher.java  | 20 ++++++++++++++------
 1 file changed, 14 insertions(+), 6 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index e1bb919..e08fe58 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -843,28 +843,36 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
     protected CleanupJobState jobReachedTerminalState(ExecutionGraphInfo 
executionGraphInfo) {
         final ArchivedExecutionGraph archivedExecutionGraph =
                 executionGraphInfo.getArchivedExecutionGraph();
+        final JobStatus terminalJobStatus = archivedExecutionGraph.getState();
         Preconditions.checkArgument(
-                archivedExecutionGraph.getState().isTerminalState(),
+                terminalJobStatus.isTerminalState(),
                 "Job %s is in state %s which is not terminal.",
                 archivedExecutionGraph.getJobID(),
-                archivedExecutionGraph.getState());
+                terminalJobStatus);
 
-        if (archivedExecutionGraph.getFailureInfo() != null) {
+        // the failureInfo contains the reason for why job was 
failed/suspended, but for
+        // finished/canceled jobs it may contain the last cause of a restart 
(if there were any)
+        // for finished/canceled jobs we don't want to print it because it is 
misleading
+        final boolean isFailureInfoRelatedToJobTermination =
+                terminalJobStatus == JobStatus.SUSPENDED || terminalJobStatus 
== JobStatus.FAILED;
+
+        if (archivedExecutionGraph.getFailureInfo() != null
+                && isFailureInfoRelatedToJobTermination) {
             log.info(
                     "Job {} reached terminal state {}.\n{}",
                     archivedExecutionGraph.getJobID(),
-                    archivedExecutionGraph.getState(),
+                    terminalJobStatus,
                     
archivedExecutionGraph.getFailureInfo().getExceptionAsString().trim());
         } else {
             log.info(
                     "Job {} reached terminal state {}.",
                     archivedExecutionGraph.getJobID(),
-                    archivedExecutionGraph.getState());
+                    terminalJobStatus);
         }
 
         archiveExecutionGraph(executionGraphInfo);
 
-        return archivedExecutionGraph.getState().isGloballyTerminalState()
+        return terminalJobStatus.isGloballyTerminalState()
                 ? CleanupJobState.GLOBAL
                 : CleanupJobState.LOCAL;
     }

Reply via email to