ConfX created FLINK-38870:
-----------------------------

             Summary: Uninformative error message when job is suspended due to 
JobManager leadership loss
                 Key: FLINK-38870
                 URL: https://issues.apache.org/jira/browse/FLINK-38870
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Coordination
    Affects Versions: 2.2.0
            Reporter: ConfX


When a JobManager loses leadership (e.g., in HA scenarios), running jobs are 
suspended. The job's `ExecutionGraph` transitions to `JobStatus.SUSPENDED` and 
is archived. When a client later queries the job result via 
`Dispatcher.requestJobResult()`, the system attempts to create a `JobResult` 
from the archived execution graph.

The issue is that `ApplicationStatus.fromJobStatus()` only maps *{*}globally 
terminal{*}* states:
 - `FINISHED` → `SUCCEEDED`
 - `FAILED` → `FAILED`
 - `CANCELED` → `CANCELED`

But `SUSPENDED` is a *{*}locally terminal{*}* state (not globally terminal), so 
it has no mapping and defaults to `UNKNOWN`:
{code:java}
// ApplicationStatus.java
public static ApplicationStatus fromJobStatus(JobStatus jobStatus) {
    return JOB_STATUS_APPLICATION_STATUS_BI_MAP.getOrDefault(jobStatus, 
UNKNOWN);
} {code}
This causes `JobResult.toJobExecutionResult()` to throw:
{code:java}
org.apache.flink.runtime.client.JobExecutionException: Job completed with 
illegal application status: UNKNOWN. {code}
A better exception meesage would show the SUSPEND job status in the error 
message with more informative logs.
One way we can do is to:
 
Changes to JobResult.java
1. Add `jobStatus` field to preserve the original state:
{code:java}
@Nullable private final JobStatus jobStatus; {code}
2. Update `toJobExecutionResult()` to handle `UNKNOWN` with the actual 
`JobStatus`:
{code:java}
} else if (applicationStatus == ApplicationStatus.UNKNOWN) {
    if (jobStatus == JobStatus.SUSPENDED) {
        exception = new JobExecutionException(
                jobId,
                "Job is in state SUSPENDED. This commonly happens when the "
                        + "JobManager lost leadership. The job may recover "
                        + "automatically if High Availability and a persistent "
                        + "job store are configured. If recovery is not 
possible "
                        + "(e.g., non-persistent ExecutionPlanStore), the job "
                        + "needs to be resubmitted.",
                cause);
    } else {
        exception = new JobExecutionException(
                jobId,
                "Job reached a terminal state without a corresponding "
                        + "ApplicationStatus. JobStatus=" + jobStatus
                        + ", ApplicationStatus=" + applicationStatus + ".",
                cause);
    }
} {code}
Before above fix:
 
{code:java}
org.apache.flink.runtime.client.JobExecutionException: Job completed with 
illegal application status: UNKNOWN. 
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:148)
  {code}
After:
{code:java}
org.apache.flink.runtime.client.JobExecutionException: Job is in state 
SUSPENDED. This commonly happens when the JobManager lost leadership. The job 
may recover automatically if High Availability and a persistent job store are 
configured. If recovery is not possible (e.g., non-persistent 
ExecutionPlanStore), the job needs to be resubmitted.
    at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:161)
 {code}
 

Besides this fix, do you think there are anything else needed to handle UNKOWN 
<-> SUSPEND state?
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to