Krzysztof Palka created FLINK-39559:
---------------------------------------
Summary: SessionReconciler.recoverSession() creates JobManager
Deployment without ownerReferences, causing unrecoverable MISSING /
AlreadyExists loop
Key: FLINK-39559
URL: https://issues.apache.org/jira/browse/FLINK-39559
Project: Flink
Issue Type: Bug
Components: Kubernetes Operator
Affects Versions: 1.14.0, 1.13.0, 1.12.1, 1.12.0, 1.11.0, 1.10.0
Environment: operator: 1.14.0
flink: 1.19.2
Reporter: Krzysztof Palka
A session cluster whose JM Deployment goes missing enters an unrecoverable
loop. The recovery path creates a Deployment without ownerReferences; JOSDK can
no longer link it back to the FlinkDeployment CR; the observer reports MISSING
again; recovery retries, hits 409 AlreadyExists, the catch path deletes the
orphan, and the cycle restarts.
{noformat}
┌─ TRIGGER ────────────────────────────────────────────────────────────────────┐
│ JM Deployment becomes invisible to JOSDK informer │
│ (deletion, restart race, REST timeout coinciding with cache miss, …) │
└─────────────────────────┬────────────────────────────────────────────────────┘
▼
AbstractFlinkDeploymentObserver
.observeFlinkCluster()
ctx.getJosdkContext().getSecondaryResource(Deployment.class) → empty
jobManagerDeploymentStatus = MISSING [line 153]
│
▼
SessionReconciler.reconcileOtherChanges()
shouldRecoverDeployment(...) → true
recoverSession(ctx) [BUG site]
ctx.getFlinkService().submitSessionCluster(getObserveConfig())
── observeConfig has NO kubernetes.jobmanager.owner.reference ──
NativeFlinkService.deploySessionCluster(conf)
KubernetesClusterDescriptor.deployClusterInternal(...)
KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification()
KubernetesJobManagerParameters.getOwnerReference() → []
┌──────────────────────────────────────────────────────┐
│ Deployment created with .withOwnerReferences([]) │ ← no
ownerRefs
└──────────────────────────────────────────────────────┘
│
▼
Next reconcile cycle
getSecondaryResource(Deployment.class) → empty
(JOSDK maps secondary→primary via ownerRefs)
jobManagerDeploymentStatus = MISSING (again)
shouldRecoverDeployment(...) → true
recoverSession(ctx) → submitSessionCluster(...)
Fabric8 .create(deployment) → 409
AlreadyExists
catch { stopAndCleanupCluster(clusterId) } ← deletes
the orphan
│
▼
┌── LOOP forever ──┐{noformat}
Code references for each frame:
* {{{}getSecondaryResource(Deployment.class){}}}:
[{{AbstractFlinkDeploymentObserver.java#L115-L116}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java#L115-L116]
* {{MISSING}} set:
[{{AbstractFlinkDeploymentObserver.java#L153}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java#L153]
* {{{}shouldRecoverDeployment(...){}}}:
[{{AbstractFlinkResourceReconciler.java#L498-L519}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L498-L519]
* BUG — {{{}recoverSession(){}}}:
[{{SessionReconciler.java#L123-L128}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]
* {{submitSessionCluster(observeConfig)}} call:
[{{SessionReconciler.java#L124}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L124]
* {{getOwnerReference()}} reading config:
[{{KubernetesJobManagerParameters.java#L109-L113}}|https://github.com/apache/flink/blob/release-1.20.1/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java#L109-L113]
* {{.withOwnerReferences(...)}} on the Deployment:
[{{KubernetesJobManagerFactory.java#L122-L126}}|https://github.com/apache/flink/blob/release-1.20.1/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java#L122-L126]
* 409 catch + cleanup:
[{{KubernetesClusterDescriptor.java#L302-L312}}|https://github.com/apache/flink/blob/release-1.20.1/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java#L302-L312]
(deletes via
[{{stopAndCleanupCluster}}|https://github.com/apache/flink/blob/release-1.20.1/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java#L307])
The bug is *deterministic* on this code path — once
[{{recoverSession()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]
is invoked, the cluster cannot self-recover. Manual intervention (kubectl
patch to add ownerReferences, or delete-and-recreate the
[{{FlinkDeployment}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/FlinkDeployment.java]
CR) is required to break the loop.
{{}}
The same {{submitSessionCluster()}} is called from two places, but only one
populates the Flink config option that drives ownerReferences:
{{}}
{{}}
{noformat}
✅ HAPPY PATH ❌ BUG PATH
───────────────────────── ─────────────────────────
SessionReconciler.deploy() SessionReconciler.recoverSession()
setOwnerReference(cr, deployConfig) ── (missing) ──
└─ deployConfig.set(
JOB_MANAGER_OWNER_REFERENCE, ...)
submitSessionCluster(deployConfig)
submitSessionCluster(observeConfig)
│ │
▼ ▼
KubernetesJobManagerParameters KubernetesJobManagerParameters
.getOwnerReference() .getOwnerReference()
reads JOB_MANAGER_OWNER_REFERENCE reads
JOB_MANAGER_OWNER_REFERENCE
→ [{apiVersion, kind, name, uid, ...}] → [] (option not
set)
│ │
▼ ▼
Deployment.metadata.ownerReferences =
Deployment.metadata.ownerReferences = []
[→ FlinkDeployment CR] (orphan — JOSDK can't
link){noformat}
{{{}{}}}{{{}{}}}
Code:
*
[{{SessionReconciler.deploy()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L98-L111]
sets ownerRef at
[L106|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L106]
before [L107
submit|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L107]
*
[{{SessionReconciler.recoverSession()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]
submits at
[L124|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L124]
without
[{{setOwnerReference()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L543-L554]
*
[{{setOwnerReference()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L543-L554]
only mutates the per-call {{{}Configuration{}}}; it is *never persisted* to
{{{}spec.flinkConfiguration{}}}, so
[{{getObserveConfig()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java#L343-L352]
never carries it
*
[{{ApplicationReconciler}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java#L270-L316]
is unaffected: its recovery routes through
[{{AbstractJobReconciler.restoreJob() →
deploy()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java#L373-L398],
and
[{{ApplicationReconciler.deploy()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java#L141-L211]
calls
[{{setOwnerReference()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java#L169]
at L169
* The gap dates back to FLINK-28979 / [commit
{{62eb68c8}}|https://github.com/apache/flink-kubernetes-operator/commit/62eb68c812713378f7f66fde01cf14370a2252e4]
(Oct 2022); the test
[{{SessionReconcilerTest#testSetOwnerReference}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java#L130-L149]
only exercises
[{{deploy()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L98-L111],
not
[{{recoverSession()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]
* Still present on
[{{main}}|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]
and in every release tag from
[{{release-1.10.0}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.10.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L116-L121]
through
[{{release-1.14.0}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]
h2. Logs
h3. First MISSING + entry into {{recoverSession()}}
The observer reports {{MISSING}} and the bug-site is invoked. Note {{Previous
status: READY}} — the JM Deployment was healthy moments before:
{noformat}
2026-04-27 09:29:18,667 o.a.f.k.o.o.d.SessionObserver [ERROR] REST service in
session cluster timed out
2026-04-27 09:29:18,667 o.a.f.k.o.o.d.SessionObserver [INFO ] Observing
JobManager deployment. Previous status: READY
2026-04-27 09:29:18,667 o.a.f.k.o.o.d.SessionObserver [ERROR] Missing
JobManager deployment
2026-04-27 09:29:18,701 o.a.f.k.o.l.AuditUtils [INFO ] >>> Event[Job] |
Warning | MISSING | Missing JobManager deployment
2026-04-27 09:29:18,882 o.a.f.k.o.s.NativeFlinkService [INFO ] Deploying
session cluster
2026-04-27 09:29:19,118 o.a.f.k.KubernetesClusterDescr [INFO ] Create flink
session cluster flink-deployment successfully, JobManager Web Interface:
http://flink-deployment-rest.temporal:8081
{noformat}
The Deployment is created — but as confirmed via kubectl immediately
afterwards, metadata.ownerReferences is empty. JOSDK cannot link it back to the
FlinkDeployment CR.
h3. Second cycle: still MISSING, now hits 409 AlreadyExists
{noformat}
2026-04-27 09:29:19,426 o.a.f.k.o.o.d.SessionObserver [INFO ] Observing
JobManager deployment. Previous status: DEPLOYING
2026-04-27 09:29:19,426 o.a.f.k.o.o.d.SessionObserver [ERROR] Missing
JobManager deployment
2026-04-27 09:29:19,475 o.a.f.k.o.l.AuditUtils [INFO ] >>> Event[Job] |
Warning | MISSING | Missing JobManager deployment
2026-04-27 09:29:19,755 o.a.f.k.o.s.NativeFlinkService [INFO ] Deploying
session cluster
2026-04-27 09:29:19,828 o.a.f.k.KubernetesClusterDescr [WARN ] Failed to create
the Kubernetes cluster "flink-deployment", try to clean up the residual
resources.
2026-04-27 09:29:19,910 o.a.f.k.o.l.AuditUtils [INFO ] >>> Event[Job] |
Warning | ERROR | Could not create Kubernetes cluster "flink-deployment". ->
Failure executing: POST at:
https://10.100.0.1:443/apis/apps/v1/namespaces/temporal/deployments. Message:
deployments.apps "flink-deployment" already exists. Received status: ...
reason=AlreadyExists, status=Failure ...
{noformat}
The catch path runs
[{{stopAndCleanupCluster()}}|https://github.com/apache/flink/blob/release-1.20.1/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java#L307]
which deletes the orphan, and the next cycle creates a new orphan. The full
stack trace pinpoints [{{recoverSession()}} line
124|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L124]:
{code:java}
org.apache.flink.kubernetes.operator.exception.ReconciliationException:
org.apache.flink.client.deployment.ClusterDeploymentException:
Could not create Kubernetes cluster "flink-deployment".
at
org.apache.flink.kubernetes.operator.service.AbstractFlinkService.submitSessionCluster(AbstractFlinkService.java:232)
at
org.apache.flink.kubernetes.operator.reconciler.deployment.SessionReconciler.recoverSession(SessionReconciler.java:124)
at
org.apache.flink.kubernetes.operator.reconciler.deployment.SessionReconciler.reconcileOtherChanges(SessionReconciler.java:117)
...
Caused by: io.fabric8.kubernetes.client.KubernetesClientException:
Failure executing: POST at: .../apis/apps/v1/namespaces/temporal/deployments.
Message: deployments.apps "flink-deployment" already exists. ...
reason=AlreadyExists, status=Failure ...
{code}
h3. Steady-state: MISSING / AlreadyExists every ~5–10 s
In the captured incident the loop ran for 38 minutes with *410 {{Missing
JobManager deployment}} log lines* and matching {{AlreadyExists}} events, until
manual intervention.
h3. kubectl evidence — the {{flink}} manager wrote with no ownerReferences
{{metadata.managedFields}} is the smoking gun. The operator's manager
({{{}flink{}}}) has no {{f:ownerReferences}} in its tracked fields; only the
manual {{kubectl patch}} (3 seconds later) wrote them:
{noformat}
$ kubectl get deployment flink-deployment -n temporal -o json
--show-managed-fields \
| jq '[.metadata.managedFields[] | {manager, operation, time, ownerRef:
(.fieldsV1["f:metadata"]["f:ownerReferences"] // null)}]'
[
{
"manager": "flink",
"operation": "Update",
"time": "2026-04-27T10:07:42Z",
"ownerRef": null ← bug: operator wrote NO
ownerReferences
},
{
"manager": "kubectl-patch",
"operation": "Update",
"time": "2026-04-27T10:07:45Z", ← runbook fix applied 3 s later
"ownerRef": {
".": {},
"k:{\"uid\":\"bd14c6f9-1f8c-4c07-ab9f-2aec7b106b59\"}": {}
}
}
]{noformat}
The healthy comparison from a sibling cluster that never hit the bug — the
{{flink}} manager itself wrote {{{}ownerReferences{}}}:
{noformat}
$ kubectl get deployment flink-deployment -n temporal -o json
--show-managed-fields \
| jq '[.metadata.managedFields[] | {manager, operation, time, ownerRef:
(.fieldsV1["f:metadata"]["f:ownerReferences"] // null)}]'
[
{
"manager": "flink",
"operation": "Update",
"time": "2026-04-23T18:30:56Z",
"ownerRef": { ← happy path: deploy() set
ownerReferences
".": {},
"k:{\"uid\":\"6274be7f-81f1-4ebe-8816-8d937da647fd\"}": {}
}
},
{
"manager": "kube-controller-manager",
"operation": "Update",
...
}
]{noformat}
h2. Real-world incident + workaround
Hit on a session cluster with HA enabled (Apache Flink Kubernetes Operator
[{{release-1.14.0}}|https://github.com/apache/flink-kubernetes-operator/tree/release-1.14.0],
image {{{}ghcr.io/apache/flink-kubernetes-operator:f504138{}}}, Flink 1.19.2).
Trigger was an unrelated transient REST timeout reported by
[{{SessionObserver}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java#L45]
coinciding with the [JOSDK secondary
lookup|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java#L115-L116]
returning empty. Recovery ran, ownerRefs were missing on the recreated
Deployment, and the cluster was stuck in the loop until executing manual patch
of the ownerReferences using:
{code:java}
FLINK_UID=$(kubectl get flinkdeployment <name> -n <ns> -o
jsonpath='{.metadata.uid}')
kubectl patch deployment <name> -n <ns> --type=merge \
-p
"{\"metadata\":{\"ownerReferences\":[{\"apiVersion\":\"flink.apache.org/v1beta1\",\"kind\":\"FlinkDeployment\",\"name\":\"<name>\",\"uid\":\"$FLINK_UID\",\"controller\":true,\"blockOwnerDeletion\":true}]}}"
{code}
h2.
Possible fix
Add the missing
[{{setOwnerReference()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L543-L554]
call in
[{{recoverSession()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]:
{code:java}
private void recoverSession(FlinkResourceContext<FlinkDeployment> ctx) throws
Exception {
var conf = ctx.getObserveConfig();
setOwnerReference(ctx.getResource(), conf); // <-- add
ctx.getFlinkService().submitSessionCluster(conf);
ctx.getResource().getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
} {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)