[ 
https://issues.apache.org/jira/browse/FLINK-39704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Prashant Bhardwaj updated FLINK-39704:
--------------------------------------
    Description: 
In a Flink HA cluster, a job that has already reached the globally terminal 
FAILED state can be recovered and restarted with the same JobID if leadership 
is revoked/reacquired immediately after the terminal transition.

The race lives in JobMasterServiceLeadershipRunner (above the LeaderElection 
SPI), so it affects:
 - both Application mode and session mode, and
 - both Kubernetes HA (Fabric8 lease loss, e.g. holderIdentity cleared / 
renew-deadline missed) and ZooKeeper HA (Curator session expiry or LeaderLatch 
loss — both invoke ZooKeeperLeaderElectionDriver.notLeader() which feeds the 
same onRevokeLeadership path).

Observed with apache/flink:2.2.0 on Kubernetes HA; the same code path exists 
for ZooKeeper HA.

*Timeline from repro (Kubernetes HA):*
{noformat}
20:52:51.075  Task failure after TaskManager deletion
20:52:51.119  Job e7ce38da0a5b4651ce64453d6ffaa25b switched RUNNING -> FAILING
20:52:51.122  Job e7ce38da0a5b4651ce64453d6ffaa25b switched FAILING -> FAILED
20:52:52.615  KubernetesLeaderElector observed empty leader holder
20:52:52.616  Leadership revoked
20:52:52.618  Dispatcher reported same job as terminal SUSPENDED
20:52:52.921  DefaultExecutionPlanStore released execution plan 
e7ce38da0a5b4651ce64453d6ffaa25b
20:52:52.926  Same job id was retrieved from KubernetesStateHandleStore
20:52:53.035  Same StreamGraph(jobId: e7ce38da0a5b4651ce64453d6ffaa25b) was 
recovered
20:53:11.340  Same job switched CREATED -> RUNNING
{noformat}
*Expected:*
Once a job reaches globally terminal FAILED, later leadership revocation/close 
should not overwrite or mask the globally terminal result as SUSPENDED. HA 
metadata should be cleaned up as for a globally terminal job, and the same job 
should not be recovered.

*Actual:* 
Leadership revocation closes the running JobMaster/Dispatcher path with 
synthetic SUSPENDED after the real FAILED result. The execution plan is 
released rather than permanently removed, so the same job id remains 
recoverable from HA storage (Kubernetes ConfigMaps or ZooKeeper) and is started 
again.

*Reproduction outline (Kubernetes HA):*
1. Run a Kubernetes HA application cluster with restart-strategy.type: none.
2. Use a persistent HA storage dir.
3. Delete the TaskManager so the job reaches FAILED.
4. Immediately after observing RUNNING -> FAILING, patch the cluster leader 
ConfigMap annotation holderIdentity to empty, forcing leadership loss/reacquire.
5. Observe FAILED followed by SUSPENDED/release/recovery of the same JobID.

Equivalent ZooKeeper HA trigger (not yet reproduced, but same code path): force 
a Curator session expiry on the JobManager (e.g. partition or pause the ZK 
ensemble past zookeeper.session-timeout) immediately after the job transitions 
to FAILED. LeaderLatch loss → notLeader() → onRevokeLeadership arrives during 
the same race window.

  was:
In a Flink HA cluster, a job that has already reached the globally terminal 
FAILED state can be recovered and restarted with the same JobID if leadership 
is revoked/reacquired immediately after the terminal transition.

The race lives in JobMasterServiceLeadershipRunner (above the LeaderElection 
SPI), so it affects:
- both Application mode and session mode, and
- both Kubernetes HA (Fabric8 lease loss, e.g. holderIdentity cleared / 
renew-deadline missed) and ZooKeeper HA (Curator session expiry or LeaderLatch 
loss — both invoke ZooKeeperLeaderElectionDriver.notLeader() which feeds the 
same onRevokeLeadership path).

Observed with apache/flink:2.2.0 on Kubernetes HA; the same code path exists 
for ZooKeeper HA.

*Timeline from repro (Kubernetes HA):*
{noformat}
20:52:51.075  Task failure after TaskManager deletion
20:52:51.119  Job e7ce38da0a5b4651ce64453d6ffaa25b switched RUNNING -> FAILING
20:52:51.122  Job e7ce38da0a5b4651ce64453d6ffaa25b switched FAILING -> FAILED
20:52:52.615  KubernetesLeaderElector observed empty leader holder
20:52:52.616  Leadership revoked
20:52:52.618  Dispatcher reported same job as terminal SUSPENDED
20:52:52.921  DefaultExecutionPlanStore released execution plan 
e7ce38da0a5b4651ce64453d6ffaa25b
20:52:52.926  Same job id was retrieved from KubernetesStateHandleStore
20:52:53.035  Same StreamGraph(jobId: e7ce38da0a5b4651ce64453d6ffaa25b) was 
recovered
20:53:11.340  Same job switched CREATED -> RUNNING
{noformat}

*Expected:*
Once a job reaches globally terminal FAILED, later leadership revocation/close 
should not overwrite or mask the globally terminal result as SUSPENDED. HA 
metadata should be cleaned up as for a globally terminal job, and the same job 
should not be recovered.

*Actual:*           
Leadership revocation closes the running JobMaster/Dispatcher path with 
synthetic SUSPENDED after the real FAILED result. The execution plan is 
released rather than permanently removed, so the same job id remains 
recoverable from HA storage (Kubernetes ConfigMaps or ZooKeeper) and is started 
again.

A secondary issue is also visible in the same churn window:
DefaultLeaderElectionService receives a grant while issuedLeaderSessionID is 
already set and throws:

{noformat}
java.lang.IllegalStateException:
The leadership should have been granted while not having the leadership 
acquired.
{noformat}

This crashes the JobManager entrypoint, but the reanimation has already 
happened before the fatal error: the failed job was released/recovered from HA 
metadata.

*Reproduction outline (Kubernetes HA):*
1. Run a Kubernetes HA application cluster with restart-strategy.type: none.
2. Use a persistent HA storage dir.
3. Delete the TaskManager so the job reaches FAILED.
4. Immediately after observing RUNNING -> FAILING, patch the cluster leader 
ConfigMap annotation holderIdentity to empty, forcing leadership loss/reacquire.
5. Observe FAILED followed by SUSPENDED/release/recovery of the same JobID.

Equivalent ZooKeeper HA trigger (not yet reproduced, but same code path): force 
a Curator session expiry on the JobManager (e.g. partition or pause the ZK 
ensemble past zookeeper.session-timeout) immediately after the job transitions 
to FAILED. LeaderLatch loss → notLeader() → onRevokeLeadership arrives during 
the same race window.


> JobMasterServiceLeadershipRunner drops globally-terminal result when 
> leadership is lost mid-completion
> ------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39704
>                 URL: https://issues.apache.org/jira/browse/FLINK-39704
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.20.4, 2.1.2, 2.0.2, 2.3.0, 2.2.1, 2.4.0
>            Reporter: Prashant Bhardwaj
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: flink-config-ha-reanimation-repro-cm-2026-05-19.yaml, 
> jm-ha-reanimation-repro-current-2026-05-18.log, 
> jm-ha-reanimation-repro-events-2026-05-18.txt, 
> jm-ha-reanimation-repro-pod-describe-2026-05-18.txt, 
> jm-ha-reanimation-repro-previous-2026-05-18.log
>
>
> In a Flink HA cluster, a job that has already reached the globally terminal 
> FAILED state can be recovered and restarted with the same JobID if leadership 
> is revoked/reacquired immediately after the terminal transition.
> The race lives in JobMasterServiceLeadershipRunner (above the LeaderElection 
> SPI), so it affects:
>  - both Application mode and session mode, and
>  - both Kubernetes HA (Fabric8 lease loss, e.g. holderIdentity cleared / 
> renew-deadline missed) and ZooKeeper HA (Curator session expiry or 
> LeaderLatch loss — both invoke ZooKeeperLeaderElectionDriver.notLeader() 
> which feeds the same onRevokeLeadership path).
> Observed with apache/flink:2.2.0 on Kubernetes HA; the same code path exists 
> for ZooKeeper HA.
> *Timeline from repro (Kubernetes HA):*
> {noformat}
> 20:52:51.075  Task failure after TaskManager deletion
> 20:52:51.119  Job e7ce38da0a5b4651ce64453d6ffaa25b switched RUNNING -> FAILING
> 20:52:51.122  Job e7ce38da0a5b4651ce64453d6ffaa25b switched FAILING -> FAILED
> 20:52:52.615  KubernetesLeaderElector observed empty leader holder
> 20:52:52.616  Leadership revoked
> 20:52:52.618  Dispatcher reported same job as terminal SUSPENDED
> 20:52:52.921  DefaultExecutionPlanStore released execution plan 
> e7ce38da0a5b4651ce64453d6ffaa25b
> 20:52:52.926  Same job id was retrieved from KubernetesStateHandleStore
> 20:52:53.035  Same StreamGraph(jobId: e7ce38da0a5b4651ce64453d6ffaa25b) was 
> recovered
> 20:53:11.340  Same job switched CREATED -> RUNNING
> {noformat}
> *Expected:*
> Once a job reaches globally terminal FAILED, later leadership 
> revocation/close should not overwrite or mask the globally terminal result as 
> SUSPENDED. HA metadata should be cleaned up as for a globally terminal job, 
> and the same job should not be recovered.
> *Actual:* 
> Leadership revocation closes the running JobMaster/Dispatcher path with 
> synthetic SUSPENDED after the real FAILED result. The execution plan is 
> released rather than permanently removed, so the same job id remains 
> recoverable from HA storage (Kubernetes ConfigMaps or ZooKeeper) and is 
> started again.
> *Reproduction outline (Kubernetes HA):*
> 1. Run a Kubernetes HA application cluster with restart-strategy.type: none.
> 2. Use a persistent HA storage dir.
> 3. Delete the TaskManager so the job reaches FAILED.
> 4. Immediately after observing RUNNING -> FAILING, patch the cluster leader 
> ConfigMap annotation holderIdentity to empty, forcing leadership 
> loss/reacquire.
> 5. Observe FAILED followed by SUSPENDED/release/recovery of the same JobID.
> Equivalent ZooKeeper HA trigger (not yet reproduced, but same code path): 
> force a Curator session expiry on the JobManager (e.g. partition or pause the 
> ZK ensemble past zookeeper.session-timeout) immediately after the job 
> transitions to FAILED. LeaderLatch loss → notLeader() → onRevokeLeadership 
> arrives during the same race window.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to