[ 
https://issues.apache.org/jira/browse/FLINK-39559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krzysztof Palka updated FLINK-39559:
------------------------------------
    Description: 
A session cluster whose JM Deployment goes missing enters an unrecoverable 
loop. The recovery path creates a Deployment without ownerReferences; JOSDK can 
no longer link it back to the FlinkDeployment CR; the observer reports MISSING 
again; recovery retries, hits 409 AlreadyExists, the catch path deletes the 
orphan, and the cycle restarts.

 
{noformat}
┌─ TRIGGER ────────────────────────────────────────────────────────────────────┐
│ JM Deployment becomes invisible to JOSDK informer                            │
│ (deletion, restart race, REST timeout coinciding with cache miss, …)         │
└─────────────────────────┬────────────────────────────────────────────────────┘
                          ▼
  AbstractFlinkDeploymentObserver
    .observeFlinkCluster()
      ctx.getJosdkContext().getSecondaryResource(Deployment.class)  → empty
      jobManagerDeploymentStatus = MISSING                          [line 153]
                          │
                          ▼
  SessionReconciler.reconcileOtherChanges()
    shouldRecoverDeployment(...)                                    → true
    recoverSession(ctx)                                             [BUG site]
      ctx.getFlinkService().submitSessionCluster(getObserveConfig())
        ── observeConfig has NO kubernetes.jobmanager.owner.reference ──
      NativeFlinkService.deploySessionCluster(conf)
        KubernetesClusterDescriptor.deployClusterInternal(...)
          KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification()
            KubernetesJobManagerParameters.getOwnerReference()      → []
          ┌──────────────────────────────────────────────────────┐
          │  Deployment created with .withOwnerReferences([])    │  ← no 
ownerRefs
          └──────────────────────────────────────────────────────┘
                          │
                          ▼
  Next reconcile cycle
    getSecondaryResource(Deployment.class)                          → empty
       (JOSDK maps secondary→primary via ownerRefs)
    jobManagerDeploymentStatus = MISSING (again)
    shouldRecoverDeployment(...)                                    → true
    recoverSession(ctx) → submitSessionCluster(...)
      Fabric8 .create(deployment)                                   → 409 
AlreadyExists
      catch { stopAndCleanupCluster(clusterId) }                    ← deletes 
the orphan
                          │
                          ▼
                  ┌── LOOP forever ──┐{noformat}
Code references for each frame:
 * {{{}getSecondaryResource(Deployment.class){}}}: 
[{{AbstractFlinkDeploymentObserver.java#L115-L116}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java#L115-L116]
 * {{MISSING}} set: 
[{{AbstractFlinkDeploymentObserver.java#L153}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java#L153]
 * {{{}shouldRecoverDeployment(...){}}}: 
[{{AbstractFlinkResourceReconciler.java#L498-L519}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L498-L519]
 * BUG — {{{}recoverSession(){}}}: 
[{{SessionReconciler.java#L123-L128}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]
 * {{submitSessionCluster(observeConfig)}} call: 
[{{SessionReconciler.java#L124}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L124]
 * {{getOwnerReference()}} reading config: 
[{{KubernetesJobManagerParameters.java#L109-L113}}|https://github.com/apache/flink/blob/release-1.20.1/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java#L109-L113]
 * {{.withOwnerReferences(...)}} on the Deployment: 
[{{KubernetesJobManagerFactory.java#L122-L126}}|https://github.com/apache/flink/blob/release-1.20.1/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java#L122-L126]
 * 409 catch + cleanup: 
[{{KubernetesClusterDescriptor.java#L302-L312}}|https://github.com/apache/flink/blob/release-1.20.1/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java#L302-L312]
 (deletes via 
[{{stopAndCleanupCluster}}|https://github.com/apache/flink/blob/release-1.20.1/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java#L307])

The bug is *deterministic* on this code path — once 
[{{recoverSession()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]
 is invoked, the cluster cannot self-recover. Manual intervention (kubectl 
patch to add ownerReferences, or delete-and-recreate the 
[{{FlinkDeployment}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/FlinkDeployment.java]
 CR) is required to break the loop.
h2. Root cause

The same {{submitSessionCluster()}} is called from two places, but only one 
populates the Flink config option that drives ownerReferences:
{noformat}
✅ HAPPY PATH                                ❌ BUG PATH
─────────────────────────                    ─────────────────────────
SessionReconciler.deploy()                   SessionReconciler.recoverSession()
  setOwnerReference(cr, deployConfig)          ── (missing) ──
   └─ deployConfig.set(
        JOB_MANAGER_OWNER_REFERENCE, ...)
  submitSessionCluster(deployConfig)           
submitSessionCluster(observeConfig)

         │                                            │
         ▼                                            ▼
  KubernetesJobManagerParameters              KubernetesJobManagerParameters
    .getOwnerReference()                        .getOwnerReference()
       reads JOB_MANAGER_OWNER_REFERENCE           reads 
JOB_MANAGER_OWNER_REFERENCE
       → [{apiVersion, kind, name, uid, ...}]      → []          (option not 
set)
         │                                            │
         ▼                                            ▼
  Deployment.metadata.ownerReferences =        
Deployment.metadata.ownerReferences = []
    [→ FlinkDeployment CR]                       (orphan — JOSDK can't 
link){noformat}
Code:
 * 
[{{SessionReconciler.deploy()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L98-L111]
 sets ownerRef at 
[L106|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L106]
 before [L107 
submit|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L107]
 * 
[{{SessionReconciler.recoverSession()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]
 submits at 
[L124|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L124]
 without 
[{{setOwnerReference()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L543-L554]
 * 
[{{setOwnerReference()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L543-L554]
 only mutates the per-call {{{}Configuration{}}}; it is *never persisted* to 
{{{}spec.flinkConfiguration{}}}, so 
[{{getObserveConfig()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java#L343-L352]
 never carries it
 * 
[{{ApplicationReconciler}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java#L270-L316]
 is unaffected: its recovery routes through 
[{{AbstractJobReconciler.restoreJob() → 
deploy()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java#L373-L398],
 and 
[{{ApplicationReconciler.deploy()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java#L141-L211]
 calls 
[{{setOwnerReference()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java#L169]
 at L169
 * The gap dates back to FLINK-28979 / [commit 
{{62eb68c8}}|https://github.com/apache/flink-kubernetes-operator/commit/62eb68c812713378f7f66fde01cf14370a2252e4]
 (Oct 2022); the test 
[{{SessionReconcilerTest#testSetOwnerReference}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java#L130-L149]
 only exercises 
[{{deploy()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L98-L111],
 not 
[{{recoverSession()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]
 * Still present on 
[{{main}}|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]
 and in every release tag from 
[{{release-1.10.0}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.10.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L116-L121]
 through 
[{{release-1.14.0}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]

h2. Logs
h3. First MISSING + entry into {{recoverSession()}}

The observer reports {{MISSING}} and the bug-site is invoked. Note {{Previous 
status: READY}} — the JM Deployment was healthy moments before:
{noformat}
2026-04-27 09:29:18,667 o.a.f.k.o.o.d.SessionObserver  [ERROR] REST service in 
session cluster timed out
2026-04-27 09:29:18,667 o.a.f.k.o.o.d.SessionObserver  [INFO ] Observing 
JobManager deployment. Previous status: READY
2026-04-27 09:29:18,667 o.a.f.k.o.o.d.SessionObserver  [ERROR] Missing 
JobManager deployment
2026-04-27 09:29:18,701 o.a.f.k.o.l.AuditUtils         [INFO ] >>> Event[Job] | 
Warning | MISSING | Missing JobManager deployment
2026-04-27 09:29:18,882 o.a.f.k.o.s.NativeFlinkService [INFO ] Deploying 
session cluster
2026-04-27 09:29:19,118 o.a.f.k.KubernetesClusterDescr [INFO ] Create flink 
session cluster flink-deployment successfully, JobManager Web Interface: 
http://flink-deployment-rest.temporal:8081
{noformat}
The Deployment is created — but as confirmed via kubectl immediately 
afterwards, metadata.ownerReferences is empty. JOSDK cannot link it back to the 
FlinkDeployment CR.
h3. Second cycle: still MISSING, now hits 409 AlreadyExists
{noformat}
2026-04-27 09:29:19,426 o.a.f.k.o.o.d.SessionObserver  [INFO ] Observing 
JobManager deployment. Previous status: DEPLOYING
2026-04-27 09:29:19,426 o.a.f.k.o.o.d.SessionObserver  [ERROR] Missing 
JobManager deployment
2026-04-27 09:29:19,475 o.a.f.k.o.l.AuditUtils         [INFO ] >>> Event[Job] | 
Warning | MISSING | Missing JobManager deployment
2026-04-27 09:29:19,755 o.a.f.k.o.s.NativeFlinkService [INFO ] Deploying 
session cluster
2026-04-27 09:29:19,828 o.a.f.k.KubernetesClusterDescr [WARN ] Failed to create 
the Kubernetes cluster "flink-deployment", try to clean up the residual 
resources.
2026-04-27 09:29:19,910 o.a.f.k.o.l.AuditUtils         [INFO ] >>> Event[Job] | 
Warning | ERROR | Could not create Kubernetes cluster "flink-deployment". -> 
Failure executing: POST at: 
https://10.100.0.1:443/apis/apps/v1/namespaces/temporal/deployments. Message: 
deployments.apps "flink-deployment" already exists. Received status: ... 
reason=AlreadyExists, status=Failure ...
{noformat}
The catch path runs 
[{{stopAndCleanupCluster()}}|https://github.com/apache/flink/blob/release-1.20.1/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java#L307]
 which deletes the orphan, and the next cycle creates a new orphan. The full 
stack trace pinpoints [{{recoverSession()}} line 
124|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L124]:
{code:java}
org.apache.flink.kubernetes.operator.exception.ReconciliationException:
  org.apache.flink.client.deployment.ClusterDeploymentException:
    Could not create Kubernetes cluster "flink-deployment".
  at 
org.apache.flink.kubernetes.operator.service.AbstractFlinkService.submitSessionCluster(AbstractFlinkService.java:232)
  at 
org.apache.flink.kubernetes.operator.reconciler.deployment.SessionReconciler.recoverSession(SessionReconciler.java:124)
  at 
org.apache.flink.kubernetes.operator.reconciler.deployment.SessionReconciler.reconcileOtherChanges(SessionReconciler.java:117)
  ...
Caused by: io.fabric8.kubernetes.client.KubernetesClientException:
  Failure executing: POST at: .../apis/apps/v1/namespaces/temporal/deployments.
  Message: deployments.apps "flink-deployment" already exists. ... 
reason=AlreadyExists, status=Failure ...
 {code}
h3. Steady-state: MISSING / AlreadyExists every ~5–10 s

In the captured incident the loop ran for 38 minutes with *410 {{Missing 
JobManager deployment}} log lines* and matching {{AlreadyExists}} events, until 
manual intervention.
h3. kubectl evidence — the {{flink}} manager wrote with no ownerReferences

{{metadata.managedFields}} is the smoking gun. The operator's manager 
({{{}flink{}}}) has no {{f:ownerReferences}} in its tracked fields; only the 
manual {{kubectl patch}} (3 seconds later) wrote them:
{noformat}
$ kubectl get deployment flink-deployment -n temporal -o json 
--show-managed-fields \
  | jq '[.metadata.managedFields[] | {manager, operation, time, ownerRef: 
(.fieldsV1["f:metadata"]["f:ownerReferences"] // null)}]'
[
  {
    "manager": "flink",
    "operation": "Update",
    "time": "2026-04-27T10:07:42Z",
    "ownerRef": null                       ← bug: operator wrote NO 
ownerReferences
  },
  {
    "manager": "kubectl-patch",
    "operation": "Update",
    "time": "2026-04-27T10:07:45Z",        ← runbook fix applied 3 s later
    "ownerRef": {
      ".": {},
      "k:{\"uid\":\"bd14c6f9-1f8c-4c07-ab9f-2aec7b106b59\"}": {}
    }
  }
]{noformat}
The healthy comparison from a sibling cluster that never hit the bug — the 
{{flink}} manager itself wrote {{{}ownerReferences{}}}:
{noformat}
$ kubectl get deployment flink-deployment -n temporal -o json 
--show-managed-fields \
  | jq '[.metadata.managedFields[] | {manager, operation, time, ownerRef: 
(.fieldsV1["f:metadata"]["f:ownerReferences"] // null)}]'
[
  {
    "manager": "flink",
    "operation": "Update",
    "time": "2026-04-23T18:30:56Z",
    "ownerRef": {                          ← happy path: deploy() set 
ownerReferences
      ".": {},
      "k:{\"uid\":\"6274be7f-81f1-4ebe-8816-8d937da647fd\"}": {}
    }
  },
  {
    "manager": "kube-controller-manager",
    "operation": "Update",
    ...
  }
]{noformat}
h2. Real-world incident + workaround

Hit on a session cluster with HA enabled (Apache Flink Kubernetes Operator 
[{{release-1.14.0}}|https://github.com/apache/flink-kubernetes-operator/tree/release-1.14.0],
 image {{{}ghcr.io/apache/flink-kubernetes-operator:f504138{}}}, Flink 1.19.2). 
Trigger was an unrelated transient REST timeout reported by 
[{{SessionObserver}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java#L45]
 coinciding with the [JOSDK secondary 
lookup|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java#L115-L116]
 returning empty. Recovery ran, ownerRefs were missing on the recreated 
Deployment, and the cluster was stuck in the loop until executing manual patch 
of the ownerReferences using:
{code:java}
FLINK_UID=$(kubectl get flinkdeployment <name> -n <ns> -o 
jsonpath='{.metadata.uid}')
kubectl patch deployment <name> -n <ns> --type=merge \
  -p 
"{\"metadata\":{\"ownerReferences\":[{\"apiVersion\":\"flink.apache.org/v1beta1\",\"kind\":\"FlinkDeployment\",\"name\":\"<name>\",\"uid\":\"$FLINK_UID\",\"controller\":true,\"blockOwnerDeletion\":true}]}}"
 {code}
h2. Possible fix

Add the missing 
[{{setOwnerReference()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L543-L554]
 call in 
[{{recoverSession()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]:
{code:java}
private void recoverSession(FlinkResourceContext<FlinkDeployment> ctx) throws 
Exception {
    var conf = ctx.getObserveConfig();
    setOwnerReference(ctx.getResource(), conf);   // <-- add
    ctx.getFlinkService().submitSessionCluster(conf);
    
ctx.getResource().getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
} {code}

  was:
A session cluster whose JM Deployment goes missing enters an unrecoverable 
loop. The recovery path creates a Deployment without ownerReferences; JOSDK can 
no longer link it back to the FlinkDeployment CR; the observer reports MISSING 
again; recovery retries, hits 409 AlreadyExists, the catch path deletes the 
orphan, and the cycle restarts.

 
{noformat}
┌─ TRIGGER ────────────────────────────────────────────────────────────────────┐
│ JM Deployment becomes invisible to JOSDK informer                            │
│ (deletion, restart race, REST timeout coinciding with cache miss, …)         │
└─────────────────────────┬────────────────────────────────────────────────────┘
                          ▼
  AbstractFlinkDeploymentObserver
    .observeFlinkCluster()
      ctx.getJosdkContext().getSecondaryResource(Deployment.class)  → empty
      jobManagerDeploymentStatus = MISSING                          [line 153]
                          │
                          ▼
  SessionReconciler.reconcileOtherChanges()
    shouldRecoverDeployment(...)                                    → true
    recoverSession(ctx)                                             [BUG site]
      ctx.getFlinkService().submitSessionCluster(getObserveConfig())
        ── observeConfig has NO kubernetes.jobmanager.owner.reference ──
      NativeFlinkService.deploySessionCluster(conf)
        KubernetesClusterDescriptor.deployClusterInternal(...)
          KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification()
            KubernetesJobManagerParameters.getOwnerReference()      → []
          ┌──────────────────────────────────────────────────────┐
          │  Deployment created with .withOwnerReferences([])    │  ← no 
ownerRefs
          └──────────────────────────────────────────────────────┘
                          │
                          ▼
  Next reconcile cycle
    getSecondaryResource(Deployment.class)                          → empty
       (JOSDK maps secondary→primary via ownerRefs)
    jobManagerDeploymentStatus = MISSING (again)
    shouldRecoverDeployment(...)                                    → true
    recoverSession(ctx) → submitSessionCluster(...)
      Fabric8 .create(deployment)                                   → 409 
AlreadyExists
      catch { stopAndCleanupCluster(clusterId) }                    ← deletes 
the orphan
                          │
                          ▼
                  ┌── LOOP forever ──┐{noformat}
Code references for each frame:
 * {{{}getSecondaryResource(Deployment.class){}}}: 
[{{AbstractFlinkDeploymentObserver.java#L115-L116}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java#L115-L116]
 * {{MISSING}} set: 
[{{AbstractFlinkDeploymentObserver.java#L153}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java#L153]
 * {{{}shouldRecoverDeployment(...){}}}: 
[{{AbstractFlinkResourceReconciler.java#L498-L519}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L498-L519]
 * BUG — {{{}recoverSession(){}}}: 
[{{SessionReconciler.java#L123-L128}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]
 * {{submitSessionCluster(observeConfig)}} call: 
[{{SessionReconciler.java#L124}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L124]
 * {{getOwnerReference()}} reading config: 
[{{KubernetesJobManagerParameters.java#L109-L113}}|https://github.com/apache/flink/blob/release-1.20.1/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java#L109-L113]
 * {{.withOwnerReferences(...)}} on the Deployment: 
[{{KubernetesJobManagerFactory.java#L122-L126}}|https://github.com/apache/flink/blob/release-1.20.1/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java#L122-L126]
 * 409 catch + cleanup: 
[{{KubernetesClusterDescriptor.java#L302-L312}}|https://github.com/apache/flink/blob/release-1.20.1/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java#L302-L312]
 (deletes via 
[{{stopAndCleanupCluster}}|https://github.com/apache/flink/blob/release-1.20.1/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java#L307])

The bug is *deterministic* on this code path — once 
[{{recoverSession()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]
 is invoked, the cluster cannot self-recover. Manual intervention (kubectl 
patch to add ownerReferences, or delete-and-recreate the 
[{{FlinkDeployment}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/FlinkDeployment.java]
 CR) is required to break the loop.


{{}}

The same {{submitSessionCluster()}} is called from two places, but only one 
populates the Flink config option that drives ownerReferences:

{{}}


{{}}
{noformat}
✅ HAPPY PATH                                ❌ BUG PATH
─────────────────────────                    ─────────────────────────
SessionReconciler.deploy()                   SessionReconciler.recoverSession()
  setOwnerReference(cr, deployConfig)          ── (missing) ──
   └─ deployConfig.set(
        JOB_MANAGER_OWNER_REFERENCE, ...)
  submitSessionCluster(deployConfig)           
submitSessionCluster(observeConfig)

         │                                            │
         ▼                                            ▼
  KubernetesJobManagerParameters              KubernetesJobManagerParameters
    .getOwnerReference()                        .getOwnerReference()
       reads JOB_MANAGER_OWNER_REFERENCE           reads 
JOB_MANAGER_OWNER_REFERENCE
       → [{apiVersion, kind, name, uid, ...}]      → []          (option not 
set)
         │                                            │
         ▼                                            ▼
  Deployment.metadata.ownerReferences =        
Deployment.metadata.ownerReferences = []
    [→ FlinkDeployment CR]                       (orphan — JOSDK can't 
link){noformat}
{{{}{}}}{{{}{}}}

Code:
 * 
[{{SessionReconciler.deploy()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L98-L111]
 sets ownerRef at 
[L106|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L106]
 before [L107 
submit|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L107]
 * 
[{{SessionReconciler.recoverSession()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]
 submits at 
[L124|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L124]
 without 
[{{setOwnerReference()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L543-L554]
 * 
[{{setOwnerReference()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L543-L554]
 only mutates the per-call {{{}Configuration{}}}; it is *never persisted* to 
{{{}spec.flinkConfiguration{}}}, so 
[{{getObserveConfig()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java#L343-L352]
 never carries it
 * 
[{{ApplicationReconciler}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java#L270-L316]
 is unaffected: its recovery routes through 
[{{AbstractJobReconciler.restoreJob() → 
deploy()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java#L373-L398],
 and 
[{{ApplicationReconciler.deploy()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java#L141-L211]
 calls 
[{{setOwnerReference()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java#L169]
 at L169
 * The gap dates back to FLINK-28979 / [commit 
{{62eb68c8}}|https://github.com/apache/flink-kubernetes-operator/commit/62eb68c812713378f7f66fde01cf14370a2252e4]
 (Oct 2022); the test 
[{{SessionReconcilerTest#testSetOwnerReference}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java#L130-L149]
 only exercises 
[{{deploy()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L98-L111],
 not 
[{{recoverSession()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]
 * Still present on 
[{{main}}|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]
 and in every release tag from 
[{{release-1.10.0}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.10.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L116-L121]
 through 
[{{release-1.14.0}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]

h2. Logs
h3. First MISSING + entry into {{recoverSession()}}

The observer reports {{MISSING}} and the bug-site is invoked. Note {{Previous 
status: READY}} — the JM Deployment was healthy moments before:
{noformat}
2026-04-27 09:29:18,667 o.a.f.k.o.o.d.SessionObserver  [ERROR] REST service in 
session cluster timed out
2026-04-27 09:29:18,667 o.a.f.k.o.o.d.SessionObserver  [INFO ] Observing 
JobManager deployment. Previous status: READY
2026-04-27 09:29:18,667 o.a.f.k.o.o.d.SessionObserver  [ERROR] Missing 
JobManager deployment
2026-04-27 09:29:18,701 o.a.f.k.o.l.AuditUtils         [INFO ] >>> Event[Job] | 
Warning | MISSING | Missing JobManager deployment
2026-04-27 09:29:18,882 o.a.f.k.o.s.NativeFlinkService [INFO ] Deploying 
session cluster
2026-04-27 09:29:19,118 o.a.f.k.KubernetesClusterDescr [INFO ] Create flink 
session cluster flink-deployment successfully, JobManager Web Interface: 
http://flink-deployment-rest.temporal:8081
{noformat}
The Deployment is created — but as confirmed via kubectl immediately 
afterwards, metadata.ownerReferences is empty. JOSDK cannot link it back to the 
FlinkDeployment CR.
h3. Second cycle: still MISSING, now hits 409 AlreadyExists

 
{noformat}
2026-04-27 09:29:19,426 o.a.f.k.o.o.d.SessionObserver  [INFO ] Observing 
JobManager deployment. Previous status: DEPLOYING
2026-04-27 09:29:19,426 o.a.f.k.o.o.d.SessionObserver  [ERROR] Missing 
JobManager deployment
2026-04-27 09:29:19,475 o.a.f.k.o.l.AuditUtils         [INFO ] >>> Event[Job] | 
Warning | MISSING | Missing JobManager deployment
2026-04-27 09:29:19,755 o.a.f.k.o.s.NativeFlinkService [INFO ] Deploying 
session cluster
2026-04-27 09:29:19,828 o.a.f.k.KubernetesClusterDescr [WARN ] Failed to create 
the Kubernetes cluster "flink-deployment", try to clean up the residual 
resources.
2026-04-27 09:29:19,910 o.a.f.k.o.l.AuditUtils         [INFO ] >>> Event[Job] | 
Warning | ERROR | Could not create Kubernetes cluster "flink-deployment". -> 
Failure executing: POST at: 
https://10.100.0.1:443/apis/apps/v1/namespaces/temporal/deployments. Message: 
deployments.apps "flink-deployment" already exists. Received status: ... 
reason=AlreadyExists, status=Failure ...
{noformat}
 

The catch path runs 
[{{stopAndCleanupCluster()}}|https://github.com/apache/flink/blob/release-1.20.1/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java#L307]
 which deletes the orphan, and the next cycle creates a new orphan. The full 
stack trace pinpoints [{{recoverSession()}} line 
124|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L124]:

 
{code:java}
org.apache.flink.kubernetes.operator.exception.ReconciliationException:
  org.apache.flink.client.deployment.ClusterDeploymentException:
    Could not create Kubernetes cluster "flink-deployment".
  at 
org.apache.flink.kubernetes.operator.service.AbstractFlinkService.submitSessionCluster(AbstractFlinkService.java:232)
  at 
org.apache.flink.kubernetes.operator.reconciler.deployment.SessionReconciler.recoverSession(SessionReconciler.java:124)
  at 
org.apache.flink.kubernetes.operator.reconciler.deployment.SessionReconciler.reconcileOtherChanges(SessionReconciler.java:117)
  ...
Caused by: io.fabric8.kubernetes.client.KubernetesClientException:
  Failure executing: POST at: .../apis/apps/v1/namespaces/temporal/deployments.
  Message: deployments.apps "flink-deployment" already exists. ... 
reason=AlreadyExists, status=Failure ...
 {code}
h3. Steady-state: MISSING / AlreadyExists every ~5–10 s

In the captured incident the loop ran for 38 minutes with *410 {{Missing 
JobManager deployment}} log lines* and matching {{AlreadyExists}} events, until 
manual intervention.

 

 
h3. kubectl evidence — the {{flink}} manager wrote with no ownerReferences

{{metadata.managedFields}} is the smoking gun. The operator's manager 
({{{}flink{}}}) has no {{f:ownerReferences}} in its tracked fields; only the 
manual {{kubectl patch}} (3 seconds later) wrote them:
{noformat}
$ kubectl get deployment flink-deployment -n temporal -o json 
--show-managed-fields \
  | jq '[.metadata.managedFields[] | {manager, operation, time, ownerRef: 
(.fieldsV1["f:metadata"]["f:ownerReferences"] // null)}]'
[
  {
    "manager": "flink",
    "operation": "Update",
    "time": "2026-04-27T10:07:42Z",
    "ownerRef": null                       ← bug: operator wrote NO 
ownerReferences
  },
  {
    "manager": "kubectl-patch",
    "operation": "Update",
    "time": "2026-04-27T10:07:45Z",        ← runbook fix applied 3 s later
    "ownerRef": {
      ".": {},
      "k:{\"uid\":\"bd14c6f9-1f8c-4c07-ab9f-2aec7b106b59\"}": {}
    }
  }
]{noformat}
The healthy comparison from a sibling cluster that never hit the bug — the 
{{flink}} manager itself wrote {{{}ownerReferences{}}}:
{noformat}
$ kubectl get deployment flink-deployment -n temporal -o json 
--show-managed-fields \
  | jq '[.metadata.managedFields[] | {manager, operation, time, ownerRef: 
(.fieldsV1["f:metadata"]["f:ownerReferences"] // null)}]'
[
  {
    "manager": "flink",
    "operation": "Update",
    "time": "2026-04-23T18:30:56Z",
    "ownerRef": {                          ← happy path: deploy() set 
ownerReferences
      ".": {},
      "k:{\"uid\":\"6274be7f-81f1-4ebe-8816-8d937da647fd\"}": {}
    }
  },
  {
    "manager": "kube-controller-manager",
    "operation": "Update",
    ...
  }
]{noformat}
h2. Real-world incident + workaround

Hit on a session cluster with HA enabled (Apache Flink Kubernetes Operator 
[{{release-1.14.0}}|https://github.com/apache/flink-kubernetes-operator/tree/release-1.14.0],
 image {{{}ghcr.io/apache/flink-kubernetes-operator:f504138{}}}, Flink 1.19.2). 
Trigger was an unrelated transient REST timeout reported by 
[{{SessionObserver}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java#L45]
 coinciding with the [JOSDK secondary 
lookup|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java#L115-L116]
 returning empty. Recovery ran, ownerRefs were missing on the recreated 
Deployment, and the cluster was stuck in the loop until executing manual patch 
of the ownerReferences using:

 
{code:java}
FLINK_UID=$(kubectl get flinkdeployment <name> -n <ns> -o 
jsonpath='{.metadata.uid}')
kubectl patch deployment <name> -n <ns> --type=merge \
  -p 
"{\"metadata\":{\"ownerReferences\":[{\"apiVersion\":\"flink.apache.org/v1beta1\",\"kind\":\"FlinkDeployment\",\"name\":\"<name>\",\"uid\":\"$FLINK_UID\",\"controller\":true,\"blockOwnerDeletion\":true}]}}"
 {code}
h2. 
Possible fix

Add the missing 
[{{setOwnerReference()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L543-L554]
 call in 
[{{recoverSession()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]:

 
{code:java}
private void recoverSession(FlinkResourceContext<FlinkDeployment> ctx) throws 
Exception {
    var conf = ctx.getObserveConfig();
    setOwnerReference(ctx.getResource(), conf);   // <-- add
    ctx.getFlinkService().submitSessionCluster(conf);
    
ctx.getResource().getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
} {code}
 

 

 


> SessionReconciler.recoverSession() creates JobManager Deployment without 
> ownerReferences, causing unrecoverable MISSING / AlreadyExists loop
> --------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39559
>                 URL: https://issues.apache.org/jira/browse/FLINK-39559
>             Project: Flink
>          Issue Type: Bug
>          Components: Kubernetes Operator
>    Affects Versions: 1.10.0, 1.11.0, 1.12.0, 1.12.1, 1.13.0, 1.14.0
>         Environment: operator: 1.14.0
> flink: 1.19.2
>            Reporter: Krzysztof Palka
>            Priority: Major
>
> A session cluster whose JM Deployment goes missing enters an unrecoverable 
> loop. The recovery path creates a Deployment without ownerReferences; JOSDK 
> can no longer link it back to the FlinkDeployment CR; the observer reports 
> MISSING again; recovery retries, hits 409 AlreadyExists, the catch path 
> deletes the orphan, and the cycle restarts.
>  
> {noformat}
> ┌─ TRIGGER 
> ────────────────────────────────────────────────────────────────────┐
> │ JM Deployment becomes invisible to JOSDK informer                           
>  │
> │ (deletion, restart race, REST timeout coinciding with cache miss, …)        
>  │
> └─────────────────────────┬────────────────────────────────────────────────────┘
>                           ▼
>   AbstractFlinkDeploymentObserver
>     .observeFlinkCluster()
>       ctx.getJosdkContext().getSecondaryResource(Deployment.class)  → empty
>       jobManagerDeploymentStatus = MISSING                          [line 153]
>                           │
>                           ▼
>   SessionReconciler.reconcileOtherChanges()
>     shouldRecoverDeployment(...)                                    → true
>     recoverSession(ctx)                                             [BUG site]
>       ctx.getFlinkService().submitSessionCluster(getObserveConfig())
>         ── observeConfig has NO kubernetes.jobmanager.owner.reference ──
>       NativeFlinkService.deploySessionCluster(conf)
>         KubernetesClusterDescriptor.deployClusterInternal(...)
>           KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification()
>             KubernetesJobManagerParameters.getOwnerReference()      → []
>           ┌──────────────────────────────────────────────────────┐
>           │  Deployment created with .withOwnerReferences([])    │  ← no 
> ownerRefs
>           └──────────────────────────────────────────────────────┘
>                           │
>                           ▼
>   Next reconcile cycle
>     getSecondaryResource(Deployment.class)                          → empty
>        (JOSDK maps secondary→primary via ownerRefs)
>     jobManagerDeploymentStatus = MISSING (again)
>     shouldRecoverDeployment(...)                                    → true
>     recoverSession(ctx) → submitSessionCluster(...)
>       Fabric8 .create(deployment)                                   → 409 
> AlreadyExists
>       catch { stopAndCleanupCluster(clusterId) }                    ← deletes 
> the orphan
>                           │
>                           ▼
>                   ┌── LOOP forever ──┐{noformat}
> Code references for each frame:
>  * {{{}getSecondaryResource(Deployment.class){}}}: 
> [{{AbstractFlinkDeploymentObserver.java#L115-L116}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java#L115-L116]
>  * {{MISSING}} set: 
> [{{AbstractFlinkDeploymentObserver.java#L153}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java#L153]
>  * {{{}shouldRecoverDeployment(...){}}}: 
> [{{AbstractFlinkResourceReconciler.java#L498-L519}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L498-L519]
>  * BUG — {{{}recoverSession(){}}}: 
> [{{SessionReconciler.java#L123-L128}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]
>  * {{submitSessionCluster(observeConfig)}} call: 
> [{{SessionReconciler.java#L124}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L124]
>  * {{getOwnerReference()}} reading config: 
> [{{KubernetesJobManagerParameters.java#L109-L113}}|https://github.com/apache/flink/blob/release-1.20.1/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java#L109-L113]
>  * {{.withOwnerReferences(...)}} on the Deployment: 
> [{{KubernetesJobManagerFactory.java#L122-L126}}|https://github.com/apache/flink/blob/release-1.20.1/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java#L122-L126]
>  * 409 catch + cleanup: 
> [{{KubernetesClusterDescriptor.java#L302-L312}}|https://github.com/apache/flink/blob/release-1.20.1/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java#L302-L312]
>  (deletes via 
> [{{stopAndCleanupCluster}}|https://github.com/apache/flink/blob/release-1.20.1/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java#L307])
> The bug is *deterministic* on this code path — once 
> [{{recoverSession()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]
>  is invoked, the cluster cannot self-recover. Manual intervention (kubectl 
> patch to add ownerReferences, or delete-and-recreate the 
> [{{FlinkDeployment}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/FlinkDeployment.java]
>  CR) is required to break the loop.
> h2. Root cause
> The same {{submitSessionCluster()}} is called from two places, but only one 
> populates the Flink config option that drives ownerReferences:
> {noformat}
> ✅ HAPPY PATH                                ❌ BUG PATH
> ─────────────────────────                    ─────────────────────────
> SessionReconciler.deploy()                   
> SessionReconciler.recoverSession()
>   setOwnerReference(cr, deployConfig)          ── (missing) ──
>    └─ deployConfig.set(
>         JOB_MANAGER_OWNER_REFERENCE, ...)
>   submitSessionCluster(deployConfig)           
> submitSessionCluster(observeConfig)
>          │                                            │
>          ▼                                            ▼
>   KubernetesJobManagerParameters              KubernetesJobManagerParameters
>     .getOwnerReference()                        .getOwnerReference()
>        reads JOB_MANAGER_OWNER_REFERENCE           reads 
> JOB_MANAGER_OWNER_REFERENCE
>        → [{apiVersion, kind, name, uid, ...}]      → []          (option not 
> set)
>          │                                            │
>          ▼                                            ▼
>   Deployment.metadata.ownerReferences =        
> Deployment.metadata.ownerReferences = []
>     [→ FlinkDeployment CR]                       (orphan — JOSDK can't 
> link){noformat}
> Code:
>  * 
> [{{SessionReconciler.deploy()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L98-L111]
>  sets ownerRef at 
> [L106|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L106]
>  before [L107 
> submit|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L107]
>  * 
> [{{SessionReconciler.recoverSession()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]
>  submits at 
> [L124|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L124]
>  without 
> [{{setOwnerReference()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L543-L554]
>  * 
> [{{setOwnerReference()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L543-L554]
>  only mutates the per-call {{{}Configuration{}}}; it is *never persisted* to 
> {{{}spec.flinkConfiguration{}}}, so 
> [{{getObserveConfig()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java#L343-L352]
>  never carries it
>  * 
> [{{ApplicationReconciler}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java#L270-L316]
>  is unaffected: its recovery routes through 
> [{{AbstractJobReconciler.restoreJob() → 
> deploy()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java#L373-L398],
>  and 
> [{{ApplicationReconciler.deploy()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java#L141-L211]
>  calls 
> [{{setOwnerReference()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java#L169]
>  at L169
>  * The gap dates back to FLINK-28979 / [commit 
> {{62eb68c8}}|https://github.com/apache/flink-kubernetes-operator/commit/62eb68c812713378f7f66fde01cf14370a2252e4]
>  (Oct 2022); the test 
> [{{SessionReconcilerTest#testSetOwnerReference}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java#L130-L149]
>  only exercises 
> [{{deploy()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L98-L111],
>  not 
> [{{recoverSession()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]
>  * Still present on 
> [{{main}}|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]
>  and in every release tag from 
> [{{release-1.10.0}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.10.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L116-L121]
>  through 
> [{{release-1.14.0}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]
> h2. Logs
> h3. First MISSING + entry into {{recoverSession()}}
> The observer reports {{MISSING}} and the bug-site is invoked. Note {{Previous 
> status: READY}} — the JM Deployment was healthy moments before:
> {noformat}
> 2026-04-27 09:29:18,667 o.a.f.k.o.o.d.SessionObserver  [ERROR] REST service 
> in session cluster timed out
> 2026-04-27 09:29:18,667 o.a.f.k.o.o.d.SessionObserver  [INFO ] Observing 
> JobManager deployment. Previous status: READY
> 2026-04-27 09:29:18,667 o.a.f.k.o.o.d.SessionObserver  [ERROR] Missing 
> JobManager deployment
> 2026-04-27 09:29:18,701 o.a.f.k.o.l.AuditUtils         [INFO ] >>> Event[Job] 
> | Warning | MISSING | Missing JobManager deployment
> 2026-04-27 09:29:18,882 o.a.f.k.o.s.NativeFlinkService [INFO ] Deploying 
> session cluster
> 2026-04-27 09:29:19,118 o.a.f.k.KubernetesClusterDescr [INFO ] Create flink 
> session cluster flink-deployment successfully, JobManager Web Interface: 
> http://flink-deployment-rest.temporal:8081
> {noformat}
> The Deployment is created — but as confirmed via kubectl immediately 
> afterwards, metadata.ownerReferences is empty. JOSDK cannot link it back to 
> the FlinkDeployment CR.
> h3. Second cycle: still MISSING, now hits 409 AlreadyExists
> {noformat}
> 2026-04-27 09:29:19,426 o.a.f.k.o.o.d.SessionObserver  [INFO ] Observing 
> JobManager deployment. Previous status: DEPLOYING
> 2026-04-27 09:29:19,426 o.a.f.k.o.o.d.SessionObserver  [ERROR] Missing 
> JobManager deployment
> 2026-04-27 09:29:19,475 o.a.f.k.o.l.AuditUtils         [INFO ] >>> Event[Job] 
> | Warning | MISSING | Missing JobManager deployment
> 2026-04-27 09:29:19,755 o.a.f.k.o.s.NativeFlinkService [INFO ] Deploying 
> session cluster
> 2026-04-27 09:29:19,828 o.a.f.k.KubernetesClusterDescr [WARN ] Failed to 
> create the Kubernetes cluster "flink-deployment", try to clean up the 
> residual resources.
> 2026-04-27 09:29:19,910 o.a.f.k.o.l.AuditUtils         [INFO ] >>> Event[Job] 
> | Warning | ERROR | Could not create Kubernetes cluster "flink-deployment". 
> -> Failure executing: POST at: 
> https://10.100.0.1:443/apis/apps/v1/namespaces/temporal/deployments. Message: 
> deployments.apps "flink-deployment" already exists. Received status: ... 
> reason=AlreadyExists, status=Failure ...
> {noformat}
> The catch path runs 
> [{{stopAndCleanupCluster()}}|https://github.com/apache/flink/blob/release-1.20.1/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java#L307]
>  which deletes the orphan, and the next cycle creates a new orphan. The full 
> stack trace pinpoints [{{recoverSession()}} line 
> 124|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L124]:
> {code:java}
> org.apache.flink.kubernetes.operator.exception.ReconciliationException:
>   org.apache.flink.client.deployment.ClusterDeploymentException:
>     Could not create Kubernetes cluster "flink-deployment".
>   at 
> org.apache.flink.kubernetes.operator.service.AbstractFlinkService.submitSessionCluster(AbstractFlinkService.java:232)
>   at 
> org.apache.flink.kubernetes.operator.reconciler.deployment.SessionReconciler.recoverSession(SessionReconciler.java:124)
>   at 
> org.apache.flink.kubernetes.operator.reconciler.deployment.SessionReconciler.reconcileOtherChanges(SessionReconciler.java:117)
>   ...
> Caused by: io.fabric8.kubernetes.client.KubernetesClientException:
>   Failure executing: POST at: 
> .../apis/apps/v1/namespaces/temporal/deployments.
>   Message: deployments.apps "flink-deployment" already exists. ... 
> reason=AlreadyExists, status=Failure ...
>  {code}
> h3. Steady-state: MISSING / AlreadyExists every ~5–10 s
> In the captured incident the loop ran for 38 minutes with *410 {{Missing 
> JobManager deployment}} log lines* and matching {{AlreadyExists}} events, 
> until manual intervention.
> h3. kubectl evidence — the {{flink}} manager wrote with no ownerReferences
> {{metadata.managedFields}} is the smoking gun. The operator's manager 
> ({{{}flink{}}}) has no {{f:ownerReferences}} in its tracked fields; only the 
> manual {{kubectl patch}} (3 seconds later) wrote them:
> {noformat}
> $ kubectl get deployment flink-deployment -n temporal -o json 
> --show-managed-fields \
>   | jq '[.metadata.managedFields[] | {manager, operation, time, ownerRef: 
> (.fieldsV1["f:metadata"]["f:ownerReferences"] // null)}]'
> [
>   {
>     "manager": "flink",
>     "operation": "Update",
>     "time": "2026-04-27T10:07:42Z",
>     "ownerRef": null                       ← bug: operator wrote NO 
> ownerReferences
>   },
>   {
>     "manager": "kubectl-patch",
>     "operation": "Update",
>     "time": "2026-04-27T10:07:45Z",        ← runbook fix applied 3 s later
>     "ownerRef": {
>       ".": {},
>       "k:{\"uid\":\"bd14c6f9-1f8c-4c07-ab9f-2aec7b106b59\"}": {}
>     }
>   }
> ]{noformat}
> The healthy comparison from a sibling cluster that never hit the bug — the 
> {{flink}} manager itself wrote {{{}ownerReferences{}}}:
> {noformat}
> $ kubectl get deployment flink-deployment -n temporal -o json 
> --show-managed-fields \
>   | jq '[.metadata.managedFields[] | {manager, operation, time, ownerRef: 
> (.fieldsV1["f:metadata"]["f:ownerReferences"] // null)}]'
> [
>   {
>     "manager": "flink",
>     "operation": "Update",
>     "time": "2026-04-23T18:30:56Z",
>     "ownerRef": {                          ← happy path: deploy() set 
> ownerReferences
>       ".": {},
>       "k:{\"uid\":\"6274be7f-81f1-4ebe-8816-8d937da647fd\"}": {}
>     }
>   },
>   {
>     "manager": "kube-controller-manager",
>     "operation": "Update",
>     ...
>   }
> ]{noformat}
> h2. Real-world incident + workaround
> Hit on a session cluster with HA enabled (Apache Flink Kubernetes Operator 
> [{{release-1.14.0}}|https://github.com/apache/flink-kubernetes-operator/tree/release-1.14.0],
>  image {{{}ghcr.io/apache/flink-kubernetes-operator:f504138{}}}, Flink 
> 1.19.2). Trigger was an unrelated transient REST timeout reported by 
> [{{SessionObserver}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java#L45]
>  coinciding with the [JOSDK secondary 
> lookup|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java#L115-L116]
>  returning empty. Recovery ran, ownerRefs were missing on the recreated 
> Deployment, and the cluster was stuck in the loop until executing manual 
> patch of the ownerReferences using:
> {code:java}
> FLINK_UID=$(kubectl get flinkdeployment <name> -n <ns> -o 
> jsonpath='{.metadata.uid}')
> kubectl patch deployment <name> -n <ns> --type=merge \
>   -p 
> "{\"metadata\":{\"ownerReferences\":[{\"apiVersion\":\"flink.apache.org/v1beta1\",\"kind\":\"FlinkDeployment\",\"name\":\"<name>\",\"uid\":\"$FLINK_UID\",\"controller\":true,\"blockOwnerDeletion\":true}]}}"
>  {code}
> h2. Possible fix
> Add the missing 
> [{{setOwnerReference()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L543-L554]
>  call in 
> [{{recoverSession()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]:
> {code:java}
> private void recoverSession(FlinkResourceContext<FlinkDeployment> ctx) throws 
> Exception {
>     var conf = ctx.getObserveConfig();
>     setOwnerReference(ctx.getResource(), conf);   // <-- add
>     ctx.getFlinkService().submitSessionCluster(conf);
>     
> ctx.getResource().getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to