[
https://issues.apache.org/jira/browse/FLINK-39704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Prashant Bhardwaj updated FLINK-39704:
--------------------------------------
Description:
In a Flink HA cluster, a job that has already reached the globally terminal
FAILED state can be recovered and restarted with the same JobID if leadership
is revoked/reacquired immediately after the terminal transition.
The race lives in JobMasterServiceLeadershipRunner (above the LeaderElection
SPI), so it affects:
- both Application mode and session mode, and
- both Kubernetes HA (Fabric8 lease loss, e.g. holderIdentity cleared /
renew-deadline missed) and ZooKeeper HA (Curator session expiry or LeaderLatch
loss — both invoke ZooKeeperLeaderElectionDriver.notLeader() which feeds the
same onRevokeLeadership path).
Observed with apache/flink:2.2.0 on Kubernetes HA; the same code path exists
for ZooKeeper HA.
*Timeline from repro (Kubernetes HA):*
{noformat}
20:52:51.075 Task failure after TaskManager deletion
20:52:51.119 Job e7ce38da0a5b4651ce64453d6ffaa25b switched RUNNING -> FAILING
20:52:51.122 Job e7ce38da0a5b4651ce64453d6ffaa25b switched FAILING -> FAILED
20:52:52.615 KubernetesLeaderElector observed empty leader holder
20:52:52.616 Leadership revoked
20:52:52.618 Dispatcher reported same job as terminal SUSPENDED
20:52:52.921 DefaultExecutionPlanStore released execution plan
e7ce38da0a5b4651ce64453d6ffaa25b
20:52:52.926 Same job id was retrieved from KubernetesStateHandleStore
20:52:53.035 Same StreamGraph(jobId: e7ce38da0a5b4651ce64453d6ffaa25b) was
recovered
20:53:11.340 Same job switched CREATED -> RUNNING
{noformat}
*Expected:*
Once a job reaches globally terminal FAILED, later leadership revocation/close
should not overwrite or mask the globally terminal result as SUSPENDED. HA
metadata should be cleaned up as for a globally terminal job, and the same job
should not be recovered.
*Actual:*
Leadership revocation closes the running JobMaster/Dispatcher path with
synthetic SUSPENDED after the real FAILED result. The execution plan is
released rather than permanently removed, so the same job id remains
recoverable from HA storage (Kubernetes ConfigMaps or ZooKeeper) and is started
again.
A secondary issue is also visible in the same churn window:
DefaultLeaderElectionService receives a grant while issuedLeaderSessionID is
already set and throws:
{noformat}
java.lang.IllegalStateException:
The leadership should have been granted while not having the leadership
acquired.
{noformat}
This crashes the JobManager entrypoint, but the reanimation has already
happened before the fatal error: the failed job was released/recovered from HA
metadata.
*Reproduction outline (Kubernetes HA):*
1. Run a Kubernetes HA application cluster with restart-strategy.type: none.
2. Use a persistent HA storage dir.
3. Delete the TaskManager so the job reaches FAILED.
4. Immediately after observing RUNNING -> FAILING, patch the cluster leader
ConfigMap annotation holderIdentity to empty, forcing leadership loss/reacquire.
5. Observe FAILED followed by SUSPENDED/release/recovery of the same JobID.
Equivalent ZooKeeper HA trigger (not yet reproduced, but same code path): force
a Curator session expiry on the JobManager (e.g. partition or pause the ZK
ensemble past zookeeper.session-timeout) immediately after the job transitions
to FAILED. LeaderLatch loss → notLeader() → onRevokeLeadership arrives during
the same race window.
was:
In a Kubernetes HA application cluster, a job that has already reached the
globally terminal FAILED state can be recovered and restarted with the same
JobID if Kubernetes leadership is revoked/reacquired immediately after the
terminal transition.
Observed with apache/flink:2.2.0 and Kubernetes HA.
*Timeline from repro:*
{noformat}
20:52:51.075 Task failure after TaskManager deletion
20:52:51.119 Job e7ce38da0a5b4651ce64453d6ffaa25b switched RUNNING -> FAILING
20:52:51.122 Job e7ce38da0a5b4651ce64453d6ffaa25b switched FAILING -> FAILED
20:52:52.615 KubernetesLeaderElector observed empty leader holder
20:52:52.616 Leadership revoked
20:52:52.618 Dispatcher reported same job as terminal SUSPENDED
20:52:52.921 DefaultExecutionPlanStore released execution plan
e7ce38da0a5b4651ce64453d6ffaa25b
20:52:52.926 Same job id was retrieved from KubernetesStateHandleStore
20:52:53.035 Same StreamGraph(jobId: e7ce38da0a5b4651ce64453d6ffaa25b) was
recovered
20:53:11.340 Same job switched CREATED -> RUNNING
{noformat}
*Expected:*
Once a job reaches globally terminal FAILED, later leadership revocation/close
should not overwrite or mask the globally terminal result as SUSPENDED. HA
metadata should be cleaned up as a globally terminal job, and the same job
should not be recovered.
*Actual:*
Leadership revocation closes the running JobMaster/Dispatcher path with
synthetic SUSPENDED after the real FAILED result. The execution plan is
released rather than permanently removed, so the same job id remains
recoverable from Kubernetes HA storage and is started again.
A secondary issue is also visible in the same churn window:
DefaultLeaderElectionService receives a grant while issuedLeaderSessionID is
already set and throws:
java.lang.IllegalStateException:
The leadership should have been granted while not having the leadership
acquired.
This crashes the JobManager entrypoint, but the reanimation has already
happened before the fatal error: the failed job was released/recovered from HA
metadata.
*Reproduction outline:*
1. Run a Kubernetes HA application cluster with restart-strategy.type: none.
2. Use a persistent HA storage dir.
3. Delete the TaskManager so the job reaches FAILED.
4. Immediately after observing RUNNING -> FAILING, patch the cluster leader
ConfigMap annotation holderIdentity to empty, forcing leadership loss/reacquire.
5. Observe FAILED followed by SUSPENDED/release/recovery of the same JobID.
> Kubernetes HA can recover a globally terminal FAILED application job after
> leadership revoke/reacquire
> ------------------------------------------------------------------------------------------------------
>
> Key: FLINK-39704
> URL: https://issues.apache.org/jira/browse/FLINK-39704
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 2.2.0, 2.4.0
> Reporter: Prashant Bhardwaj
> Priority: Major
> Attachments: flink-config-ha-reanimation-repro-cm-2026-05-19.yaml,
> jm-ha-reanimation-repro-current-2026-05-18.log,
> jm-ha-reanimation-repro-events-2026-05-18.txt,
> jm-ha-reanimation-repro-pod-describe-2026-05-18.txt,
> jm-ha-reanimation-repro-previous-2026-05-18.log
>
>
> In a Flink HA cluster, a job that has already reached the globally terminal
> FAILED state can be recovered and restarted with the same JobID if leadership
> is revoked/reacquired immediately after the terminal transition.
> The race lives in JobMasterServiceLeadershipRunner (above the LeaderElection
> SPI), so it affects:
> - both Application mode and session mode, and
> - both Kubernetes HA (Fabric8 lease loss, e.g. holderIdentity cleared /
> renew-deadline missed) and ZooKeeper HA (Curator session expiry or
> LeaderLatch loss — both invoke ZooKeeperLeaderElectionDriver.notLeader()
> which feeds the same onRevokeLeadership path).
> Observed with apache/flink:2.2.0 on Kubernetes HA; the same code path exists
> for ZooKeeper HA.
> *Timeline from repro (Kubernetes HA):*
> {noformat}
> 20:52:51.075 Task failure after TaskManager deletion
> 20:52:51.119 Job e7ce38da0a5b4651ce64453d6ffaa25b switched RUNNING -> FAILING
> 20:52:51.122 Job e7ce38da0a5b4651ce64453d6ffaa25b switched FAILING -> FAILED
> 20:52:52.615 KubernetesLeaderElector observed empty leader holder
> 20:52:52.616 Leadership revoked
> 20:52:52.618 Dispatcher reported same job as terminal SUSPENDED
> 20:52:52.921 DefaultExecutionPlanStore released execution plan
> e7ce38da0a5b4651ce64453d6ffaa25b
> 20:52:52.926 Same job id was retrieved from KubernetesStateHandleStore
> 20:52:53.035 Same StreamGraph(jobId: e7ce38da0a5b4651ce64453d6ffaa25b) was
> recovered
> 20:53:11.340 Same job switched CREATED -> RUNNING
> {noformat}
> *Expected:*
> Once a job reaches globally terminal FAILED, later leadership
> revocation/close should not overwrite or mask the globally terminal result as
> SUSPENDED. HA metadata should be cleaned up as for a globally terminal job,
> and the same job should not be recovered.
> *Actual:*
> Leadership revocation closes the running JobMaster/Dispatcher path with
> synthetic SUSPENDED after the real FAILED result. The execution plan is
> released rather than permanently removed, so the same job id remains
> recoverable from HA storage (Kubernetes ConfigMaps or ZooKeeper) and is
> started again.
> A secondary issue is also visible in the same churn window:
> DefaultLeaderElectionService receives a grant while issuedLeaderSessionID is
> already set and throws:
> {noformat}
> java.lang.IllegalStateException:
> The leadership should have been granted while not having the leadership
> acquired.
> {noformat}
> This crashes the JobManager entrypoint, but the reanimation has already
> happened before the fatal error: the failed job was released/recovered from
> HA metadata.
> *Reproduction outline (Kubernetes HA):*
> 1. Run a Kubernetes HA application cluster with restart-strategy.type: none.
> 2. Use a persistent HA storage dir.
> 3. Delete the TaskManager so the job reaches FAILED.
> 4. Immediately after observing RUNNING -> FAILING, patch the cluster leader
> ConfigMap annotation holderIdentity to empty, forcing leadership
> loss/reacquire.
> 5. Observe FAILED followed by SUSPENDED/release/recovery of the same JobID.
> Equivalent ZooKeeper HA trigger (not yet reproduced, but same code path):
> force a Curator session expiry on the JobManager (e.g. partition or pause the
> ZK ensemble past zookeeper.session-timeout) immediately after the job
> transitions to FAILED. LeaderLatch loss → notLeader() → onRevokeLeadership
> arrives during the same race window.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)