[ 
https://issues.apache.org/jira/browse/FLINK-39704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18082059#comment-18082059
 ] 

Matthias Pohl commented on FLINK-39704:
---------------------------------------

Looking at the code and the logs shows, indeed, that there is a race condition 
when terminating a job while the leadership is lost. This issue is not scoped 
to clusters in Application Mode but also effects clusters in session mode. I 
guess, I was initially misguided by the term "Kubernetes HA application 
cluster" in the ticket description.

Here's the race that we're observing, I guess:
 # Scheduler completes the inner {{JobMasterServiceProcess.resultFuture}} with 
{{{}FAILED{}}}.
 # {{forwardResultFuture}} in 
[JobMasterServiceLeadershipRunner:334|https://github.com/apache/flink/blob/26a0012b14af1adba2a4772bfc257e4f6196b9b8/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java#L334]
 hands that off to {{runIfValidLeader}} in 
[JobMasterServiceLeadershipRunner:359|https://github.com/apache/flink/blob/26a0012b14af1adba2a4772bfc257e4f6196b9b8/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java#L359]
 which calls {{leaderElection.hasLeadershipAsync(...)}} — an async K8s 
round-trip.
 # Before that check returns, the {{KubernetesLeaderElector}} sees leadership 
being revoked on the JobManager side (line 785ff in 
{{jm-ha-reanimation-repro-previous-2026-05-18.log}}) after a new leader was 
elected
 # The forwarded {{FAILED}} result is dropped via {{{}noLeaderFallback{}}} 
(which prints only debug logs); meanwhile the dispatcher stops the session 
leader process → {{LeadershipRunner.closeAsync()}} runs → completes the outer 
resultFuture with {{SUSPENDED}}
 # The dispatcher’s {{jobReachedTerminalState}} sees {{SUSPENDED}} which is not 
a globally terminal state resulting. The job cleanup doesn't happen due the 
early return in 
[Dispatcher:2295|https://github.com/apache/flink/blob/22598624741dbc785ef2bd38d9aef65b2c395932/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L2295]
 which skips persisting the job to JRS and cleaning up the HA data. The JM 
failover causes the JobGraph to be resubmitted (because the {{JobGraphStore}} 
entry wasn't removed).

> Kubernetes HA can recover a globally terminal FAILED application job after 
> leadership revoke/reacquire
> ------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39704
>                 URL: https://issues.apache.org/jira/browse/FLINK-39704
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 2.2.0, 2.4.0
>            Reporter: Prashant Bhardwaj
>            Priority: Major
>         Attachments: flink-config-ha-reanimation-repro-cm-2026-05-19.yaml, 
> jm-ha-reanimation-repro-current-2026-05-18.log, 
> jm-ha-reanimation-repro-events-2026-05-18.txt, 
> jm-ha-reanimation-repro-pod-describe-2026-05-18.txt, 
> jm-ha-reanimation-repro-previous-2026-05-18.log
>
>
> In a Kubernetes HA application cluster, a job that has already reached the 
> globally terminal FAILED state can be recovered and restarted with the same 
> JobID if Kubernetes leadership is revoked/reacquired immediately after the 
> terminal transition.
> Observed with apache/flink:2.2.0 and Kubernetes HA.
> *Timeline from repro:*
> {noformat}
> 20:52:51.075  Task failure after TaskManager deletion
> 20:52:51.119  Job e7ce38da0a5b4651ce64453d6ffaa25b switched RUNNING -> FAILING
> 20:52:51.122  Job e7ce38da0a5b4651ce64453d6ffaa25b switched FAILING -> FAILED
> 20:52:52.615  KubernetesLeaderElector observed empty leader holder
> 20:52:52.616  Leadership revoked
> 20:52:52.618  Dispatcher reported same job as terminal SUSPENDED
> 20:52:52.921  DefaultExecutionPlanStore released execution plan 
> e7ce38da0a5b4651ce64453d6ffaa25b
> 20:52:52.926  Same job id was retrieved from KubernetesStateHandleStore
> 20:52:53.035  Same StreamGraph(jobId: e7ce38da0a5b4651ce64453d6ffaa25b) was 
> recovered
> 20:53:11.340  Same job switched CREATED -> RUNNING
> {noformat}
> *Expected:*
> Once a job reaches globally terminal FAILED, later leadership 
> revocation/close should not overwrite or mask the globally terminal result as 
> SUSPENDED. HA metadata should be cleaned up as a globally terminal job, and 
> the same job should not be recovered.
> *Actual:*
> Leadership revocation closes the running JobMaster/Dispatcher path with 
> synthetic SUSPENDED after the real FAILED result. The execution plan is 
> released rather than permanently removed, so the same job id remains 
> recoverable from Kubernetes HA storage and is started again.
> A secondary issue is also visible in the same churn window:
> DefaultLeaderElectionService receives a grant while issuedLeaderSessionID is 
> already set and throws:
> java.lang.IllegalStateException:
> The leadership should have been granted while not having the leadership 
> acquired.
> This crashes the JobManager entrypoint, but the reanimation has already 
> happened before the fatal error: the failed job was released/recovered from 
> HA metadata.
> *Reproduction outline:*
> 1. Run a Kubernetes HA application cluster with restart-strategy.type: none.
> 2. Use a persistent HA storage dir.
> 3. Delete the TaskManager so the job reaches FAILED.
> 4. Immediately after observing RUNNING -> FAILING, patch the cluster leader 
> ConfigMap annotation holderIdentity to empty, forcing leadership 
> loss/reacquire.
> 5. Observe FAILED followed by SUSPENDED/release/recovery of the same JobID.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to