[
https://issues.apache.org/jira/browse/FLINK-39559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Gyula Fora closed FLINK-39559.
------------------------------
Fix Version/s: kubernetes-operator-1.15.0
Assignee: Krzysztof Palka
Resolution: Fixed
merged to main e3e0cbe42f998248c11edc88628d4da4e63046d6
> SessionReconciler.recoverSession() creates JobManager Deployment without
> ownerReferences, causing unrecoverable MISSING / AlreadyExists loop
> --------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-39559
> URL: https://issues.apache.org/jira/browse/FLINK-39559
> Project: Flink
> Issue Type: Bug
> Components: Kubernetes Operator
> Affects Versions: 1.10.0, 1.11.0, 1.12.0, 1.12.1, 1.13.0, 1.14.0
> Environment: operator: 1.14.0
> flink: 1.19.2
> Reporter: Krzysztof Palka
> Assignee: Krzysztof Palka
> Priority: Major
> Labels: pull-request-available
> Fix For: kubernetes-operator-1.15.0
>
>
> A session cluster whose JM Deployment goes missing enters an unrecoverable
> loop. The recovery path creates a Deployment without ownerReferences; JOSDK
> can no longer link it back to the FlinkDeployment CR; the observer reports
> MISSING again; recovery retries, hits 409 AlreadyExists, the catch path
> deletes the orphan, and the cycle restarts.
>
> {noformat}
> ┌─ TRIGGER
> ────────────────────────────────────────────────────────────────────┐
> │ JM Deployment becomes invisible to JOSDK informer
> │
> │ (deletion, restart race, REST timeout coinciding with cache miss, …)
> │
> └─────────────────────────┬────────────────────────────────────────────────────┘
> ▼
> AbstractFlinkDeploymentObserver
> .observeFlinkCluster()
> ctx.getJosdkContext().getSecondaryResource(Deployment.class) → empty
> jobManagerDeploymentStatus = MISSING [line 153]
> │
> ▼
> SessionReconciler.reconcileOtherChanges()
> shouldRecoverDeployment(...) → true
> recoverSession(ctx) [BUG site]
> ctx.getFlinkService().submitSessionCluster(getObserveConfig())
> ── observeConfig has NO kubernetes.jobmanager.owner.reference ──
> NativeFlinkService.deploySessionCluster(conf)
> KubernetesClusterDescriptor.deployClusterInternal(...)
> KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification()
> KubernetesJobManagerParameters.getOwnerReference() → []
> ┌──────────────────────────────────────────────────────┐
> │ Deployment created with .withOwnerReferences([]) │ ← no
> ownerRefs
> └──────────────────────────────────────────────────────┘
> │
> ▼
> Next reconcile cycle
> getSecondaryResource(Deployment.class) → empty
> (JOSDK maps secondary→primary via ownerRefs)
> jobManagerDeploymentStatus = MISSING (again)
> shouldRecoverDeployment(...) → true
> recoverSession(ctx) → submitSessionCluster(...)
> Fabric8 .create(deployment) → 409
> AlreadyExists
> catch { stopAndCleanupCluster(clusterId) } ← deletes
> the orphan
> │
> ▼
> ┌── LOOP forever ──┐{noformat}
> Code references for each frame:
> * {{{}getSecondaryResource(Deployment.class){}}}:
> [{{AbstractFlinkDeploymentObserver.java#L115-L116}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java#L115-L116]
> * {{MISSING}} set:
> [{{AbstractFlinkDeploymentObserver.java#L153}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java#L153]
> * {{{}shouldRecoverDeployment(...){}}}:
> [{{AbstractFlinkResourceReconciler.java#L498-L519}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L498-L519]
> * BUG — {{{}recoverSession(){}}}:
> [{{SessionReconciler.java#L123-L128}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]
> * {{submitSessionCluster(observeConfig)}} call:
> [{{SessionReconciler.java#L124}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L124]
> * {{getOwnerReference()}} reading config:
> [{{KubernetesJobManagerParameters.java#L109-L113}}|https://github.com/apache/flink/blob/release-1.20.1/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java#L109-L113]
> * {{.withOwnerReferences(...)}} on the Deployment:
> [{{KubernetesJobManagerFactory.java#L122-L126}}|https://github.com/apache/flink/blob/release-1.20.1/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java#L122-L126]
> * 409 catch + cleanup:
> [{{KubernetesClusterDescriptor.java#L302-L312}}|https://github.com/apache/flink/blob/release-1.20.1/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java#L302-L312]
> (deletes via
> [{{stopAndCleanupCluster}}|https://github.com/apache/flink/blob/release-1.20.1/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java#L307])
> The bug is *deterministic* on this code path — once
> [{{recoverSession()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]
> is invoked, the cluster cannot self-recover. Manual intervention (kubectl
> patch to add ownerReferences, or delete-and-recreate the
> [{{FlinkDeployment}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/FlinkDeployment.java]
> CR) is required to break the loop.
> h2. Root cause
> The same {{submitSessionCluster()}} is called from two places, but only one
> populates the Flink config option that drives ownerReferences:
> {noformat}
> ✅ HAPPY PATH ❌ BUG PATH
> ───────────────────────── ─────────────────────────
> SessionReconciler.deploy()
> SessionReconciler.recoverSession()
> setOwnerReference(cr, deployConfig) ── (missing) ──
> └─ deployConfig.set(
> JOB_MANAGER_OWNER_REFERENCE, ...)
> submitSessionCluster(deployConfig)
> submitSessionCluster(observeConfig)
> │ │
> ▼ ▼
> KubernetesJobManagerParameters KubernetesJobManagerParameters
> .getOwnerReference() .getOwnerReference()
> reads JOB_MANAGER_OWNER_REFERENCE reads
> JOB_MANAGER_OWNER_REFERENCE
> → [{apiVersion, kind, name, uid, ...}] → [] (option not
> set)
> │ │
> ▼ ▼
> Deployment.metadata.ownerReferences =
> Deployment.metadata.ownerReferences = []
> [→ FlinkDeployment CR] (orphan — JOSDK can't
> link){noformat}
> Code:
> *
> [{{SessionReconciler.deploy()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L98-L111]
> sets ownerRef at
> [L106|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L106]
> before [L107
> submit|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L107]
> *
> [{{SessionReconciler.recoverSession()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]
> submits at
> [L124|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L124]
> without
> [{{setOwnerReference()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L543-L554]
> *
> [{{setOwnerReference()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L543-L554]
> only mutates the per-call {{{}Configuration{}}}; it is *never persisted* to
> {{{}spec.flinkConfiguration{}}}, so
> [{{getObserveConfig()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java#L343-L352]
> never carries it
> *
> [{{ApplicationReconciler}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java#L270-L316]
> is unaffected: its recovery routes through
> [{{AbstractJobReconciler.restoreJob() →
> deploy()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java#L373-L398],
> and
> [{{ApplicationReconciler.deploy()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java#L141-L211]
> calls
> [{{setOwnerReference()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java#L169]
> at L169
> * The gap dates back to FLINK-28979 / [commit
> {{62eb68c8}}|https://github.com/apache/flink-kubernetes-operator/commit/62eb68c812713378f7f66fde01cf14370a2252e4]
> (Oct 2022); the test
> [{{SessionReconcilerTest#testSetOwnerReference}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java#L130-L149]
> only exercises
> [{{deploy()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L98-L111],
> not
> [{{recoverSession()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]
> * Still present on
> [{{main}}|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]
> and in every release tag from
> [{{release-1.10.0}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.10.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L116-L121]
> through
> [{{release-1.14.0}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]
> h2. Logs
> h3. First MISSING + entry into {{recoverSession()}}
> The observer reports {{MISSING}} and the bug-site is invoked. Note {{Previous
> status: READY}} — the JM Deployment was healthy moments before:
> {noformat}
> 2026-04-27 09:29:18,667 o.a.f.k.o.o.d.SessionObserver [ERROR] REST service
> in session cluster timed out
> 2026-04-27 09:29:18,667 o.a.f.k.o.o.d.SessionObserver [INFO ] Observing
> JobManager deployment. Previous status: READY
> 2026-04-27 09:29:18,667 o.a.f.k.o.o.d.SessionObserver [ERROR] Missing
> JobManager deployment
> 2026-04-27 09:29:18,701 o.a.f.k.o.l.AuditUtils [INFO ] >>> Event[Job]
> | Warning | MISSING | Missing JobManager deployment
> 2026-04-27 09:29:18,882 o.a.f.k.o.s.NativeFlinkService [INFO ] Deploying
> session cluster
> 2026-04-27 09:29:19,118 o.a.f.k.KubernetesClusterDescr [INFO ] Create flink
> session cluster flink-deployment successfully, JobManager Web Interface:
> http://flink-deployment-rest.temporal:8081
> {noformat}
> The Deployment is created — but as confirmed via kubectl immediately
> afterwards, metadata.ownerReferences is empty. JOSDK cannot link it back to
> the FlinkDeployment CR.
> h3. Second cycle: still MISSING, now hits 409 AlreadyExists
> {noformat}
> 2026-04-27 09:29:19,426 o.a.f.k.o.o.d.SessionObserver [INFO ] Observing
> JobManager deployment. Previous status: DEPLOYING
> 2026-04-27 09:29:19,426 o.a.f.k.o.o.d.SessionObserver [ERROR] Missing
> JobManager deployment
> 2026-04-27 09:29:19,475 o.a.f.k.o.l.AuditUtils [INFO ] >>> Event[Job]
> | Warning | MISSING | Missing JobManager deployment
> 2026-04-27 09:29:19,755 o.a.f.k.o.s.NativeFlinkService [INFO ] Deploying
> session cluster
> 2026-04-27 09:29:19,828 o.a.f.k.KubernetesClusterDescr [WARN ] Failed to
> create the Kubernetes cluster "flink-deployment", try to clean up the
> residual resources.
> 2026-04-27 09:29:19,910 o.a.f.k.o.l.AuditUtils [INFO ] >>> Event[Job]
> | Warning | ERROR | Could not create Kubernetes cluster "flink-deployment".
> -> Failure executing: POST at:
> https://10.100.0.1:443/apis/apps/v1/namespaces/temporal/deployments. Message:
> deployments.apps "flink-deployment" already exists. Received status: ...
> reason=AlreadyExists, status=Failure ...
> {noformat}
> The catch path runs
> [{{stopAndCleanupCluster()}}|https://github.com/apache/flink/blob/release-1.20.1/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java#L307]
> which deletes the orphan, and the next cycle creates a new orphan. The full
> stack trace pinpoints [{{recoverSession()}} line
> 124|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L124]:
> {code:java}
> org.apache.flink.kubernetes.operator.exception.ReconciliationException:
> org.apache.flink.client.deployment.ClusterDeploymentException:
> Could not create Kubernetes cluster "flink-deployment".
> at
> org.apache.flink.kubernetes.operator.service.AbstractFlinkService.submitSessionCluster(AbstractFlinkService.java:232)
> at
> org.apache.flink.kubernetes.operator.reconciler.deployment.SessionReconciler.recoverSession(SessionReconciler.java:124)
> at
> org.apache.flink.kubernetes.operator.reconciler.deployment.SessionReconciler.reconcileOtherChanges(SessionReconciler.java:117)
> ...
> Caused by: io.fabric8.kubernetes.client.KubernetesClientException:
> Failure executing: POST at:
> .../apis/apps/v1/namespaces/temporal/deployments.
> Message: deployments.apps "flink-deployment" already exists. ...
> reason=AlreadyExists, status=Failure ...
> {code}
> h3. Steady-state: MISSING / AlreadyExists every ~5–10 s
> In the captured incident the loop ran for 38 minutes with *410 {{Missing
> JobManager deployment}} log lines* and matching {{AlreadyExists}} events,
> until manual intervention.
> h3. kubectl evidence — the {{flink}} manager wrote with no ownerReferences
> {{metadata.managedFields}} is the smoking gun. The operator's manager
> ({{{}flink{}}}) has no {{f:ownerReferences}} in its tracked fields; only the
> manual {{kubectl patch}} (3 seconds later) wrote them:
> {noformat}
> $ kubectl get deployment flink-deployment -n temporal -o json
> --show-managed-fields \
> | jq '[.metadata.managedFields[] | {manager, operation, time, ownerRef:
> (.fieldsV1["f:metadata"]["f:ownerReferences"] // null)}]'
> [
> {
> "manager": "flink",
> "operation": "Update",
> "time": "2026-04-27T10:07:42Z",
> "ownerRef": null ← bug: operator wrote NO
> ownerReferences
> },
> {
> "manager": "kubectl-patch",
> "operation": "Update",
> "time": "2026-04-27T10:07:45Z", ← runbook fix applied 3 s later
> "ownerRef": {
> ".": {},
> "k:{\"uid\":\"bd14c6f9-1f8c-4c07-ab9f-2aec7b106b59\"}": {}
> }
> }
> ]{noformat}
> The healthy comparison from a sibling cluster that never hit the bug — the
> {{flink}} manager itself wrote {{{}ownerReferences{}}}:
> {noformat}
> $ kubectl get deployment flink-deployment -n temporal -o json
> --show-managed-fields \
> | jq '[.metadata.managedFields[] | {manager, operation, time, ownerRef:
> (.fieldsV1["f:metadata"]["f:ownerReferences"] // null)}]'
> [
> {
> "manager": "flink",
> "operation": "Update",
> "time": "2026-04-23T18:30:56Z",
> "ownerRef": { ← happy path: deploy() set
> ownerReferences
> ".": {},
> "k:{\"uid\":\"6274be7f-81f1-4ebe-8816-8d937da647fd\"}": {}
> }
> },
> {
> "manager": "kube-controller-manager",
> "operation": "Update",
> ...
> }
> ]{noformat}
> h2. Real-world incident + workaround
> Hit on a session cluster with HA enabled (Apache Flink Kubernetes Operator
> [{{release-1.14.0}}|https://github.com/apache/flink-kubernetes-operator/tree/release-1.14.0],
> image {{{}ghcr.io/apache/flink-kubernetes-operator:f504138{}}}, Flink
> 1.19.2). Trigger was an unrelated transient REST timeout reported by
> [{{SessionObserver}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java#L45]
> coinciding with the [JOSDK secondary
> lookup|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java#L115-L116]
> returning empty. Recovery ran, ownerRefs were missing on the recreated
> Deployment, and the cluster was stuck in the loop until executing manual
> patch of the ownerReferences using:
> {code:java}
> FLINK_UID=$(kubectl get flinkdeployment <name> -n <ns> -o
> jsonpath='{.metadata.uid}')
> kubectl patch deployment <name> -n <ns> --type=merge \
> -p
> "{\"metadata\":{\"ownerReferences\":[{\"apiVersion\":\"flink.apache.org/v1beta1\",\"kind\":\"FlinkDeployment\",\"name\":\"<name>\",\"uid\":\"$FLINK_UID\",\"controller\":true,\"blockOwnerDeletion\":true}]}}"
> {code}
> h2. Possible fix
> Add the missing
> [{{setOwnerReference()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L543-L554]
> call in
> [{{recoverSession()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]:
> {code:java}
> private void recoverSession(FlinkResourceContext<FlinkDeployment> ctx) throws
> Exception {
> var conf = ctx.getObserveConfig();
> setOwnerReference(ctx.getResource(), conf); // <-- add
> ctx.getFlinkService().submitSessionCluster(conf);
>
> ctx.getResource().getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)