ConfX created FLINK-38870:
-----------------------------
Summary: Uninformative error message when job is suspended due to
JobManager leadership loss
Key: FLINK-38870
URL: https://issues.apache.org/jira/browse/FLINK-38870
Project: Flink
Issue Type: Bug
Components: Runtime / Coordination
Affects Versions: 2.2.0
Reporter: ConfX
When a JobManager loses leadership (e.g., in HA scenarios), running jobs are
suspended. The job's `ExecutionGraph` transitions to `JobStatus.SUSPENDED` and
is archived. When a client later queries the job result via
`Dispatcher.requestJobResult()`, the system attempts to create a `JobResult`
from the archived execution graph.
The issue is that `ApplicationStatus.fromJobStatus()` only maps *{*}globally
terminal{*}* states:
- `FINISHED` → `SUCCEEDED`
- `FAILED` → `FAILED`
- `CANCELED` → `CANCELED`
But `SUSPENDED` is a *{*}locally terminal{*}* state (not globally terminal), so
it has no mapping and defaults to `UNKNOWN`:
{code:java}
// ApplicationStatus.java
public static ApplicationStatus fromJobStatus(JobStatus jobStatus) {
return JOB_STATUS_APPLICATION_STATUS_BI_MAP.getOrDefault(jobStatus,
UNKNOWN);
} {code}
This causes `JobResult.toJobExecutionResult()` to throw:
{code:java}
org.apache.flink.runtime.client.JobExecutionException: Job completed with
illegal application status: UNKNOWN. {code}
A better exception meesage would show the SUSPEND job status in the error
message with more informative logs.
One way we can do is to:
Changes to JobResult.java
1. Add `jobStatus` field to preserve the original state:
{code:java}
@Nullable private final JobStatus jobStatus; {code}
2. Update `toJobExecutionResult()` to handle `UNKNOWN` with the actual
`JobStatus`:
{code:java}
} else if (applicationStatus == ApplicationStatus.UNKNOWN) {
if (jobStatus == JobStatus.SUSPENDED) {
exception = new JobExecutionException(
jobId,
"Job is in state SUSPENDED. This commonly happens when the "
+ "JobManager lost leadership. The job may recover "
+ "automatically if High Availability and a persistent "
+ "job store are configured. If recovery is not
possible "
+ "(e.g., non-persistent ExecutionPlanStore), the job "
+ "needs to be resubmitted.",
cause);
} else {
exception = new JobExecutionException(
jobId,
"Job reached a terminal state without a corresponding "
+ "ApplicationStatus. JobStatus=" + jobStatus
+ ", ApplicationStatus=" + applicationStatus + ".",
cause);
}
} {code}
Before above fix:
{code:java}
org.apache.flink.runtime.client.JobExecutionException: Job completed with
illegal application status: UNKNOWN.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:148)
{code}
After:
{code:java}
org.apache.flink.runtime.client.JobExecutionException: Job is in state
SUSPENDED. This commonly happens when the JobManager lost leadership. The job
may recover automatically if High Availability and a persistent job store are
configured. If recovery is not possible (e.g., non-persistent
ExecutionPlanStore), the job needs to be resubmitted.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:161)
{code}
Besides this fix, do you think there are anything else needed to handle UNKOWN
<-> SUSPEND state?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)