[ 
https://issues.apache.org/jira/browse/FLINK-39559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-39559.
------------------------------
    Fix Version/s: kubernetes-operator-1.15.0
         Assignee: Krzysztof Palka
       Resolution: Fixed

merged to main e3e0cbe42f998248c11edc88628d4da4e63046d6

> SessionReconciler.recoverSession() creates JobManager Deployment without 
> ownerReferences, causing unrecoverable MISSING / AlreadyExists loop
> --------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39559
>                 URL: https://issues.apache.org/jira/browse/FLINK-39559
>             Project: Flink
>          Issue Type: Bug
>          Components: Kubernetes Operator
>    Affects Versions: 1.10.0, 1.11.0, 1.12.0, 1.12.1, 1.13.0, 1.14.0
>         Environment: operator: 1.14.0
> flink: 1.19.2
>            Reporter: Krzysztof Palka
>            Assignee: Krzysztof Palka
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: kubernetes-operator-1.15.0
>
>
> A session cluster whose JM Deployment goes missing enters an unrecoverable 
> loop. The recovery path creates a Deployment without ownerReferences; JOSDK 
> can no longer link it back to the FlinkDeployment CR; the observer reports 
> MISSING again; recovery retries, hits 409 AlreadyExists, the catch path 
> deletes the orphan, and the cycle restarts.
>  
> {noformat}
> ┌─ TRIGGER 
> ────────────────────────────────────────────────────────────────────┐
> │ JM Deployment becomes invisible to JOSDK informer                           
>  │
> │ (deletion, restart race, REST timeout coinciding with cache miss, …)        
>  │
> └─────────────────────────┬────────────────────────────────────────────────────┘
>                           ▼
>   AbstractFlinkDeploymentObserver
>     .observeFlinkCluster()
>       ctx.getJosdkContext().getSecondaryResource(Deployment.class)  → empty
>       jobManagerDeploymentStatus = MISSING                          [line 153]
>                           │
>                           ▼
>   SessionReconciler.reconcileOtherChanges()
>     shouldRecoverDeployment(...)                                    → true
>     recoverSession(ctx)                                             [BUG site]
>       ctx.getFlinkService().submitSessionCluster(getObserveConfig())
>         ── observeConfig has NO kubernetes.jobmanager.owner.reference ──
>       NativeFlinkService.deploySessionCluster(conf)
>         KubernetesClusterDescriptor.deployClusterInternal(...)
>           KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification()
>             KubernetesJobManagerParameters.getOwnerReference()      → []
>           ┌──────────────────────────────────────────────────────┐
>           │  Deployment created with .withOwnerReferences([])    │  ← no 
> ownerRefs
>           └──────────────────────────────────────────────────────┘
>                           │
>                           ▼
>   Next reconcile cycle
>     getSecondaryResource(Deployment.class)                          → empty
>        (JOSDK maps secondary→primary via ownerRefs)
>     jobManagerDeploymentStatus = MISSING (again)
>     shouldRecoverDeployment(...)                                    → true
>     recoverSession(ctx) → submitSessionCluster(...)
>       Fabric8 .create(deployment)                                   → 409 
> AlreadyExists
>       catch { stopAndCleanupCluster(clusterId) }                    ← deletes 
> the orphan
>                           │
>                           ▼
>                   ┌── LOOP forever ──┐{noformat}
> Code references for each frame:
>  * {{{}getSecondaryResource(Deployment.class){}}}: 
> [{{AbstractFlinkDeploymentObserver.java#L115-L116}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java#L115-L116]
>  * {{MISSING}} set: 
> [{{AbstractFlinkDeploymentObserver.java#L153}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java#L153]
>  * {{{}shouldRecoverDeployment(...){}}}: 
> [{{AbstractFlinkResourceReconciler.java#L498-L519}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L498-L519]
>  * BUG — {{{}recoverSession(){}}}: 
> [{{SessionReconciler.java#L123-L128}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]
>  * {{submitSessionCluster(observeConfig)}} call: 
> [{{SessionReconciler.java#L124}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L124]
>  * {{getOwnerReference()}} reading config: 
> [{{KubernetesJobManagerParameters.java#L109-L113}}|https://github.com/apache/flink/blob/release-1.20.1/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java#L109-L113]
>  * {{.withOwnerReferences(...)}} on the Deployment: 
> [{{KubernetesJobManagerFactory.java#L122-L126}}|https://github.com/apache/flink/blob/release-1.20.1/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java#L122-L126]
>  * 409 catch + cleanup: 
> [{{KubernetesClusterDescriptor.java#L302-L312}}|https://github.com/apache/flink/blob/release-1.20.1/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java#L302-L312]
>  (deletes via 
> [{{stopAndCleanupCluster}}|https://github.com/apache/flink/blob/release-1.20.1/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java#L307])
> The bug is *deterministic* on this code path — once 
> [{{recoverSession()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]
>  is invoked, the cluster cannot self-recover. Manual intervention (kubectl 
> patch to add ownerReferences, or delete-and-recreate the 
> [{{FlinkDeployment}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/FlinkDeployment.java]
>  CR) is required to break the loop.
> h2. Root cause
> The same {{submitSessionCluster()}} is called from two places, but only one 
> populates the Flink config option that drives ownerReferences:
> {noformat}
> ✅ HAPPY PATH                                ❌ BUG PATH
> ─────────────────────────                    ─────────────────────────
> SessionReconciler.deploy()                   
> SessionReconciler.recoverSession()
>   setOwnerReference(cr, deployConfig)          ── (missing) ──
>    └─ deployConfig.set(
>         JOB_MANAGER_OWNER_REFERENCE, ...)
>   submitSessionCluster(deployConfig)           
> submitSessionCluster(observeConfig)
>          │                                            │
>          ▼                                            ▼
>   KubernetesJobManagerParameters              KubernetesJobManagerParameters
>     .getOwnerReference()                        .getOwnerReference()
>        reads JOB_MANAGER_OWNER_REFERENCE           reads 
> JOB_MANAGER_OWNER_REFERENCE
>        → [{apiVersion, kind, name, uid, ...}]      → []          (option not 
> set)
>          │                                            │
>          ▼                                            ▼
>   Deployment.metadata.ownerReferences =        
> Deployment.metadata.ownerReferences = []
>     [→ FlinkDeployment CR]                       (orphan — JOSDK can't 
> link){noformat}
> Code:
>  * 
> [{{SessionReconciler.deploy()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L98-L111]
>  sets ownerRef at 
> [L106|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L106]
>  before [L107 
> submit|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L107]
>  * 
> [{{SessionReconciler.recoverSession()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]
>  submits at 
> [L124|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L124]
>  without 
> [{{setOwnerReference()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L543-L554]
>  * 
> [{{setOwnerReference()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L543-L554]
>  only mutates the per-call {{{}Configuration{}}}; it is *never persisted* to 
> {{{}spec.flinkConfiguration{}}}, so 
> [{{getObserveConfig()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java#L343-L352]
>  never carries it
>  * 
> [{{ApplicationReconciler}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java#L270-L316]
>  is unaffected: its recovery routes through 
> [{{AbstractJobReconciler.restoreJob() → 
> deploy()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java#L373-L398],
>  and 
> [{{ApplicationReconciler.deploy()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java#L141-L211]
>  calls 
> [{{setOwnerReference()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java#L169]
>  at L169
>  * The gap dates back to FLINK-28979 / [commit 
> {{62eb68c8}}|https://github.com/apache/flink-kubernetes-operator/commit/62eb68c812713378f7f66fde01cf14370a2252e4]
>  (Oct 2022); the test 
> [{{SessionReconcilerTest#testSetOwnerReference}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java#L130-L149]
>  only exercises 
> [{{deploy()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L98-L111],
>  not 
> [{{recoverSession()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]
>  * Still present on 
> [{{main}}|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]
>  and in every release tag from 
> [{{release-1.10.0}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.10.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L116-L121]
>  through 
> [{{release-1.14.0}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]
> h2. Logs
> h3. First MISSING + entry into {{recoverSession()}}
> The observer reports {{MISSING}} and the bug-site is invoked. Note {{Previous 
> status: READY}} — the JM Deployment was healthy moments before:
> {noformat}
> 2026-04-27 09:29:18,667 o.a.f.k.o.o.d.SessionObserver  [ERROR] REST service 
> in session cluster timed out
> 2026-04-27 09:29:18,667 o.a.f.k.o.o.d.SessionObserver  [INFO ] Observing 
> JobManager deployment. Previous status: READY
> 2026-04-27 09:29:18,667 o.a.f.k.o.o.d.SessionObserver  [ERROR] Missing 
> JobManager deployment
> 2026-04-27 09:29:18,701 o.a.f.k.o.l.AuditUtils         [INFO ] >>> Event[Job] 
> | Warning | MISSING | Missing JobManager deployment
> 2026-04-27 09:29:18,882 o.a.f.k.o.s.NativeFlinkService [INFO ] Deploying 
> session cluster
> 2026-04-27 09:29:19,118 o.a.f.k.KubernetesClusterDescr [INFO ] Create flink 
> session cluster flink-deployment successfully, JobManager Web Interface: 
> http://flink-deployment-rest.temporal:8081
> {noformat}
> The Deployment is created — but as confirmed via kubectl immediately 
> afterwards, metadata.ownerReferences is empty. JOSDK cannot link it back to 
> the FlinkDeployment CR.
> h3. Second cycle: still MISSING, now hits 409 AlreadyExists
> {noformat}
> 2026-04-27 09:29:19,426 o.a.f.k.o.o.d.SessionObserver  [INFO ] Observing 
> JobManager deployment. Previous status: DEPLOYING
> 2026-04-27 09:29:19,426 o.a.f.k.o.o.d.SessionObserver  [ERROR] Missing 
> JobManager deployment
> 2026-04-27 09:29:19,475 o.a.f.k.o.l.AuditUtils         [INFO ] >>> Event[Job] 
> | Warning | MISSING | Missing JobManager deployment
> 2026-04-27 09:29:19,755 o.a.f.k.o.s.NativeFlinkService [INFO ] Deploying 
> session cluster
> 2026-04-27 09:29:19,828 o.a.f.k.KubernetesClusterDescr [WARN ] Failed to 
> create the Kubernetes cluster "flink-deployment", try to clean up the 
> residual resources.
> 2026-04-27 09:29:19,910 o.a.f.k.o.l.AuditUtils         [INFO ] >>> Event[Job] 
> | Warning | ERROR | Could not create Kubernetes cluster "flink-deployment". 
> -> Failure executing: POST at: 
> https://10.100.0.1:443/apis/apps/v1/namespaces/temporal/deployments. Message: 
> deployments.apps "flink-deployment" already exists. Received status: ... 
> reason=AlreadyExists, status=Failure ...
> {noformat}
> The catch path runs 
> [{{stopAndCleanupCluster()}}|https://github.com/apache/flink/blob/release-1.20.1/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java#L307]
>  which deletes the orphan, and the next cycle creates a new orphan. The full 
> stack trace pinpoints [{{recoverSession()}} line 
> 124|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L124]:
> {code:java}
> org.apache.flink.kubernetes.operator.exception.ReconciliationException:
>   org.apache.flink.client.deployment.ClusterDeploymentException:
>     Could not create Kubernetes cluster "flink-deployment".
>   at 
> org.apache.flink.kubernetes.operator.service.AbstractFlinkService.submitSessionCluster(AbstractFlinkService.java:232)
>   at 
> org.apache.flink.kubernetes.operator.reconciler.deployment.SessionReconciler.recoverSession(SessionReconciler.java:124)
>   at 
> org.apache.flink.kubernetes.operator.reconciler.deployment.SessionReconciler.reconcileOtherChanges(SessionReconciler.java:117)
>   ...
> Caused by: io.fabric8.kubernetes.client.KubernetesClientException:
>   Failure executing: POST at: 
> .../apis/apps/v1/namespaces/temporal/deployments.
>   Message: deployments.apps "flink-deployment" already exists. ... 
> reason=AlreadyExists, status=Failure ...
>  {code}
> h3. Steady-state: MISSING / AlreadyExists every ~5–10 s
> In the captured incident the loop ran for 38 minutes with *410 {{Missing 
> JobManager deployment}} log lines* and matching {{AlreadyExists}} events, 
> until manual intervention.
> h3. kubectl evidence — the {{flink}} manager wrote with no ownerReferences
> {{metadata.managedFields}} is the smoking gun. The operator's manager 
> ({{{}flink{}}}) has no {{f:ownerReferences}} in its tracked fields; only the 
> manual {{kubectl patch}} (3 seconds later) wrote them:
> {noformat}
> $ kubectl get deployment flink-deployment -n temporal -o json 
> --show-managed-fields \
>   | jq '[.metadata.managedFields[] | {manager, operation, time, ownerRef: 
> (.fieldsV1["f:metadata"]["f:ownerReferences"] // null)}]'
> [
>   {
>     "manager": "flink",
>     "operation": "Update",
>     "time": "2026-04-27T10:07:42Z",
>     "ownerRef": null                       ← bug: operator wrote NO 
> ownerReferences
>   },
>   {
>     "manager": "kubectl-patch",
>     "operation": "Update",
>     "time": "2026-04-27T10:07:45Z",        ← runbook fix applied 3 s later
>     "ownerRef": {
>       ".": {},
>       "k:{\"uid\":\"bd14c6f9-1f8c-4c07-ab9f-2aec7b106b59\"}": {}
>     }
>   }
> ]{noformat}
> The healthy comparison from a sibling cluster that never hit the bug — the 
> {{flink}} manager itself wrote {{{}ownerReferences{}}}:
> {noformat}
> $ kubectl get deployment flink-deployment -n temporal -o json 
> --show-managed-fields \
>   | jq '[.metadata.managedFields[] | {manager, operation, time, ownerRef: 
> (.fieldsV1["f:metadata"]["f:ownerReferences"] // null)}]'
> [
>   {
>     "manager": "flink",
>     "operation": "Update",
>     "time": "2026-04-23T18:30:56Z",
>     "ownerRef": {                          ← happy path: deploy() set 
> ownerReferences
>       ".": {},
>       "k:{\"uid\":\"6274be7f-81f1-4ebe-8816-8d937da647fd\"}": {}
>     }
>   },
>   {
>     "manager": "kube-controller-manager",
>     "operation": "Update",
>     ...
>   }
> ]{noformat}
> h2. Real-world incident + workaround
> Hit on a session cluster with HA enabled (Apache Flink Kubernetes Operator 
> [{{release-1.14.0}}|https://github.com/apache/flink-kubernetes-operator/tree/release-1.14.0],
>  image {{{}ghcr.io/apache/flink-kubernetes-operator:f504138{}}}, Flink 
> 1.19.2). Trigger was an unrelated transient REST timeout reported by 
> [{{SessionObserver}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java#L45]
>  coinciding with the [JOSDK secondary 
> lookup|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java#L115-L116]
>  returning empty. Recovery ran, ownerRefs were missing on the recreated 
> Deployment, and the cluster was stuck in the loop until executing manual 
> patch of the ownerReferences using:
> {code:java}
> FLINK_UID=$(kubectl get flinkdeployment <name> -n <ns> -o 
> jsonpath='{.metadata.uid}')
> kubectl patch deployment <name> -n <ns> --type=merge \
>   -p 
> "{\"metadata\":{\"ownerReferences\":[{\"apiVersion\":\"flink.apache.org/v1beta1\",\"kind\":\"FlinkDeployment\",\"name\":\"<name>\",\"uid\":\"$FLINK_UID\",\"controller\":true,\"blockOwnerDeletion\":true}]}}"
>  {code}
> h2. Possible fix
> Add the missing 
> [{{setOwnerReference()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java#L543-L554]
>  call in 
> [{{recoverSession()}}|https://github.com/apache/flink-kubernetes-operator/blob/release-1.14.0/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java#L123-L128]:
> {code:java}
> private void recoverSession(FlinkResourceContext<FlinkDeployment> ctx) throws 
> Exception {
>     var conf = ctx.getObserveConfig();
>     setOwnerReference(ctx.getResource(), conf);   // <-- add
>     ctx.getFlinkService().submitSessionCluster(conf);
>     
> ctx.getResource().getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to