Krzysztof Palka created FLINK-39559:
---------------------------------------

             Summary: SessionReconciler.recoverSession() creates JobManager 
Deployment without ownerReferences, causing unrecoverable MISSING / 
AlreadyExists loop
                 Key: FLINK-39559
                 URL: https://issues.apache.org/jira/browse/FLINK-39559
             Project: Flink
          Issue Type: Bug
          Components: Kubernetes Operator
    Affects Versions: 1.14.0, 1.13.0, 1.12.1, 1.12.0, 1.11.0, 1.10.0
         Environment: operator: 1.14.0
flink: 1.19.2
            Reporter: Krzysztof Palka


A session cluster whose JM Deployment goes missing enters an unrecoverable 
loop. The recovery path creates a Deployment without ownerReferences; JOSDK can 
no longer link it back to the FlinkDeployment CR; the observer reports MISSING 
again; recovery retries, hits 409 AlreadyExists, the catch path deletes the 
orphan, and the cycle restarts.

 
{noformat}
┌─ TRIGGER ────────────────────────────────────────────────────────────────────┐
│ JM Deployment becomes invisible to JOSDK informer                            │
│ (deletion, restart race, REST timeout coinciding with cache miss, …)         │
└─────────────────────────┬────────────────────────────────────────────────────┘
                          ▼
  AbstractFlinkDeploymentObserver
    .observeFlinkCluster()
      ctx.getJosdkContext().getSecondaryResource(Deployment.class)  → empty
      jobManagerDeploymentStatus = MISSING                          [line 153]
                          │
                          ▼
  SessionReconciler.reconcileOtherChanges()
    shouldRecoverDeployment(...)                                    → true
    recoverSession(ctx)                                             [BUG site]
      ctx.getFlinkService().submitSessionCluster(getObserveConfig())
        ── observeConfig has NO kubernetes.jobmanager.owner.reference ──
      NativeFlinkService.deploySessionCluster(conf)
        KubernetesClusterDescriptor.deployClusterInternal(...)
          KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification()
            KubernetesJobManagerParameters.getOwnerReference()      → []
          ┌──────────────────────────────────────────────────────┐
          │  Deployment created with .withOwnerReferences([])    │  ← no 
ownerRefs
          └──────────────────────────────────────────────────────┘
                          │
                          ▼
  Next reconcile cycle
    getSecondaryResource(Deployment.class)                          → empty
       (JOSDK maps secondary→primary via ownerRefs)
    jobManagerDeploymentStatus = MISSING (again)
    shouldRecoverDeployment(...)                                    → true
    recoverSession(ctx) → submitSessionCluster(...)
      Fabric8 .create(deployment)                                   → 409 
AlreadyExists
      catch { stopAndCleanupCluster(clusterId) }                    ← deletes 
the orphan
                          │
                          ▼
                  ┌── LOOP forever ──┐{noformat}
Code references for each frame:
 * {{{}getSecondaryResource(Deployment.class){}}}: 
[{{AbstractFlinkDeploymentObserver.java#L115-L116}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java#L115-L116]
 * {{MISSING}} set: 
[{{AbstractFlinkDeploymentObserver.java#L153}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java#L153]
 * {{{}shouldRecoverDeployment(...){}}}: 
[{{AbstractFlinkResourceReconciler.java#L498-L519}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L498-L519]
 * BUG — {{{}recoverSession(){}}}: 
[{{SessionReconciler.java#L123-L128}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]
 * {{submitSessionCluster(observeConfig)}} call: 
[{{SessionReconciler.java#L124}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L124]
 * {{getOwnerReference()}} reading config: 
[{{KubernetesJobManagerParameters.java#L109-L113}}|https://github.com/apache/flink/blob/release-1.20.1/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java#L109-L113]
 * {{.withOwnerReferences(...)}} on the Deployment: 
[{{KubernetesJobManagerFactory.java#L122-L126}}|https://github.com/apache/flink/blob/release-1.20.1/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java#L122-L126]
 * 409 catch + cleanup: 
[{{KubernetesClusterDescriptor.java#L302-L312}}|https://github.com/apache/flink/blob/release-1.20.1/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java#L302-L312]
 (deletes via 
[{{stopAndCleanupCluster}}|https://github.com/apache/flink/blob/release-1.20.1/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java#L307])

The bug is *deterministic* on this code path — once 
[{{recoverSession()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]
 is invoked, the cluster cannot self-recover. Manual intervention (kubectl 
patch to add ownerReferences, or delete-and-recreate the 
[{{FlinkDeployment}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/FlinkDeployment.java]
 CR) is required to break the loop.


{{}}

The same {{submitSessionCluster()}} is called from two places, but only one 
populates the Flink config option that drives ownerReferences:

{{}}


{{}}
{noformat}
✅ HAPPY PATH                                ❌ BUG PATH
─────────────────────────                    ─────────────────────────
SessionReconciler.deploy()                   SessionReconciler.recoverSession()
  setOwnerReference(cr, deployConfig)          ── (missing) ──
   └─ deployConfig.set(
        JOB_MANAGER_OWNER_REFERENCE, ...)
  submitSessionCluster(deployConfig)           
submitSessionCluster(observeConfig)

         │                                            │
         ▼                                            ▼
  KubernetesJobManagerParameters              KubernetesJobManagerParameters
    .getOwnerReference()                        .getOwnerReference()
       reads JOB_MANAGER_OWNER_REFERENCE           reads 
JOB_MANAGER_OWNER_REFERENCE
       → [{apiVersion, kind, name, uid, ...}]      → []          (option not 
set)
         │                                            │
         ▼                                            ▼
  Deployment.metadata.ownerReferences =        
Deployment.metadata.ownerReferences = []
    [→ FlinkDeployment CR]                       (orphan — JOSDK can't 
link){noformat}
{{{}{}}}{{{}{}}}

Code:
 * 
[{{SessionReconciler.deploy()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L98-L111]
 sets ownerRef at 
[L106|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L106]
 before [L107 
submit|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L107]
 * 
[{{SessionReconciler.recoverSession()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]
 submits at 
[L124|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L124]
 without 
[{{setOwnerReference()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L543-L554]
 * 
[{{setOwnerReference()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L543-L554]
 only mutates the per-call {{{}Configuration{}}}; it is *never persisted* to 
{{{}spec.flinkConfiguration{}}}, so 
[{{getObserveConfig()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java#L343-L352]
 never carries it
 * 
[{{ApplicationReconciler}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java#L270-L316]
 is unaffected: its recovery routes through 
[{{AbstractJobReconciler.restoreJob() → 
deploy()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java#L373-L398],
 and 
[{{ApplicationReconciler.deploy()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java#L141-L211]
 calls 
[{{setOwnerReference()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java#L169]
 at L169
 * The gap dates back to FLINK-28979 / [commit 
{{62eb68c8}}|https://github.com/apache/flink-kubernetes-operator/commit/62eb68c812713378f7f66fde01cf14370a2252e4]
 (Oct 2022); the test 
[{{SessionReconcilerTest#testSetOwnerReference}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java#L130-L149]
 only exercises 
[{{deploy()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L98-L111],
 not 
[{{recoverSession()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]
 * Still present on 
[{{main}}|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]
 and in every release tag from 
[{{release-1.10.0}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.10.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L116-L121]
 through 
[{{release-1.14.0}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]

h2. Logs
h3. First MISSING + entry into {{recoverSession()}}

The observer reports {{MISSING}} and the bug-site is invoked. Note {{Previous 
status: READY}} — the JM Deployment was healthy moments before:
{noformat}
2026-04-27 09:29:18,667 o.a.f.k.o.o.d.SessionObserver  [ERROR] REST service in 
session cluster timed out
2026-04-27 09:29:18,667 o.a.f.k.o.o.d.SessionObserver  [INFO ] Observing 
JobManager deployment. Previous status: READY
2026-04-27 09:29:18,667 o.a.f.k.o.o.d.SessionObserver  [ERROR] Missing 
JobManager deployment
2026-04-27 09:29:18,701 o.a.f.k.o.l.AuditUtils         [INFO ] >>> Event[Job] | 
Warning | MISSING | Missing JobManager deployment
2026-04-27 09:29:18,882 o.a.f.k.o.s.NativeFlinkService [INFO ] Deploying 
session cluster
2026-04-27 09:29:19,118 o.a.f.k.KubernetesClusterDescr [INFO ] Create flink 
session cluster flink-deployment successfully, JobManager Web Interface: 
http://flink-deployment-rest.temporal:8081
{noformat}
The Deployment is created — but as confirmed via kubectl immediately 
afterwards, metadata.ownerReferences is empty. JOSDK cannot link it back to the 
FlinkDeployment CR.
h3. Second cycle: still MISSING, now hits 409 AlreadyExists

 
{noformat}
2026-04-27 09:29:19,426 o.a.f.k.o.o.d.SessionObserver  [INFO ] Observing 
JobManager deployment. Previous status: DEPLOYING
2026-04-27 09:29:19,426 o.a.f.k.o.o.d.SessionObserver  [ERROR] Missing 
JobManager deployment
2026-04-27 09:29:19,475 o.a.f.k.o.l.AuditUtils         [INFO ] >>> Event[Job] | 
Warning | MISSING | Missing JobManager deployment
2026-04-27 09:29:19,755 o.a.f.k.o.s.NativeFlinkService [INFO ] Deploying 
session cluster
2026-04-27 09:29:19,828 o.a.f.k.KubernetesClusterDescr [WARN ] Failed to create 
the Kubernetes cluster "flink-deployment", try to clean up the residual 
resources.
2026-04-27 09:29:19,910 o.a.f.k.o.l.AuditUtils         [INFO ] >>> Event[Job] | 
Warning | ERROR | Could not create Kubernetes cluster "flink-deployment". -> 
Failure executing: POST at: 
https://10.100.0.1:443/apis/apps/v1/namespaces/temporal/deployments. Message: 
deployments.apps "flink-deployment" already exists. Received status: ... 
reason=AlreadyExists, status=Failure ...
{noformat}
 

The catch path runs 
[{{stopAndCleanupCluster()}}|https://github.com/apache/flink/blob/release-1.20.1/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java#L307]
 which deletes the orphan, and the next cycle creates a new orphan. The full 
stack trace pinpoints [{{recoverSession()}} line 
124|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L124]:

 
{code:java}
org.apache.flink.kubernetes.operator.exception.ReconciliationException:
  org.apache.flink.client.deployment.ClusterDeploymentException:
    Could not create Kubernetes cluster "flink-deployment".
  at 
org.apache.flink.kubernetes.operator.service.AbstractFlinkService.submitSessionCluster(AbstractFlinkService.java:232)
  at 
org.apache.flink.kubernetes.operator.reconciler.deployment.SessionReconciler.recoverSession(SessionReconciler.java:124)
  at 
org.apache.flink.kubernetes.operator.reconciler.deployment.SessionReconciler.reconcileOtherChanges(SessionReconciler.java:117)
  ...
Caused by: io.fabric8.kubernetes.client.KubernetesClientException:
  Failure executing: POST at: .../apis/apps/v1/namespaces/temporal/deployments.
  Message: deployments.apps "flink-deployment" already exists. ... 
reason=AlreadyExists, status=Failure ...
 {code}
h3. Steady-state: MISSING / AlreadyExists every ~5–10 s

In the captured incident the loop ran for 38 minutes with *410 {{Missing 
JobManager deployment}} log lines* and matching {{AlreadyExists}} events, until 
manual intervention.

 

 
h3. kubectl evidence — the {{flink}} manager wrote with no ownerReferences

{{metadata.managedFields}} is the smoking gun. The operator's manager 
({{{}flink{}}}) has no {{f:ownerReferences}} in its tracked fields; only the 
manual {{kubectl patch}} (3 seconds later) wrote them:
{noformat}
$ kubectl get deployment flink-deployment -n temporal -o json 
--show-managed-fields \
  | jq '[.metadata.managedFields[] | {manager, operation, time, ownerRef: 
(.fieldsV1["f:metadata"]["f:ownerReferences"] // null)}]'
[
  {
    "manager": "flink",
    "operation": "Update",
    "time": "2026-04-27T10:07:42Z",
    "ownerRef": null                       ← bug: operator wrote NO 
ownerReferences
  },
  {
    "manager": "kubectl-patch",
    "operation": "Update",
    "time": "2026-04-27T10:07:45Z",        ← runbook fix applied 3 s later
    "ownerRef": {
      ".": {},
      "k:{\"uid\":\"bd14c6f9-1f8c-4c07-ab9f-2aec7b106b59\"}": {}
    }
  }
]{noformat}
The healthy comparison from a sibling cluster that never hit the bug — the 
{{flink}} manager itself wrote {{{}ownerReferences{}}}:
{noformat}
$ kubectl get deployment flink-deployment -n temporal -o json 
--show-managed-fields \
  | jq '[.metadata.managedFields[] | {manager, operation, time, ownerRef: 
(.fieldsV1["f:metadata"]["f:ownerReferences"] // null)}]'
[
  {
    "manager": "flink",
    "operation": "Update",
    "time": "2026-04-23T18:30:56Z",
    "ownerRef": {                          ← happy path: deploy() set 
ownerReferences
      ".": {},
      "k:{\"uid\":\"6274be7f-81f1-4ebe-8816-8d937da647fd\"}": {}
    }
  },
  {
    "manager": "kube-controller-manager",
    "operation": "Update",
    ...
  }
]{noformat}
h2. Real-world incident + workaround

Hit on a session cluster with HA enabled (Apache Flink Kubernetes Operator 
[{{release-1.14.0}}|https://github.com/apache/flink-kubernetes-operator/tree/release-1.14.0],
 image {{{}ghcr.io/apache/flink-kubernetes-operator:f504138{}}}, Flink 1.19.2). 
Trigger was an unrelated transient REST timeout reported by 
[{{SessionObserver}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java#L45]
 coinciding with the [JOSDK secondary 
lookup|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java#L115-L116]
 returning empty. Recovery ran, ownerRefs were missing on the recreated 
Deployment, and the cluster was stuck in the loop until executing manual patch 
of the ownerReferences using:

 
{code:java}
FLINK_UID=$(kubectl get flinkdeployment <name> -n <ns> -o 
jsonpath='{.metadata.uid}')
kubectl patch deployment <name> -n <ns> --type=merge \
  -p 
"{\"metadata\":{\"ownerReferences\":[{\"apiVersion\":\"flink.apache.org/v1beta1\",\"kind\":\"FlinkDeployment\",\"name\":\"<name>\",\"uid\":\"$FLINK_UID\",\"controller\":true,\"blockOwnerDeletion\":true}]}}"
 {code}
h2. 
Possible fix

Add the missing 
[{{setOwnerReference()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L543-L554]
 call in 
[{{recoverSession()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]:

 
{code:java}
private void recoverSession(FlinkResourceContext<FlinkDeployment> ctx) throws 
Exception {
    var conf = ctx.getObserveConfig();
    setOwnerReference(ctx.getResource(), conf);   // <-- add
    ctx.getFlinkService().submitSessionCluster(conf);
    
ctx.getResource().getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
} {code}
 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to