This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new e3e0cbe4 [FLINK-39559] Set ownerReferences on JM Deployment recreated 
during session recovery
e3e0cbe4 is described below

commit e3e0cbe42f998248c11edc88628d4da4e63046d6
Author: Krzysztof Adrian Palka <[email protected]>
AuthorDate: Tue Apr 28 16:42:59 2026 +0200

    [FLINK-39559] Set ownerReferences on JM Deployment recreated during session 
recovery
---
 .../reconciler/deployment/SessionReconciler.java   |  9 ++++-
 .../deployment/SessionReconcilerTest.java          | 47 ++++++++++++++++++++++
 2 files changed, 55 insertions(+), 1 deletion(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
index d7ad7492..c38afce0 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
@@ -121,7 +121,14 @@ public class SessionReconciler
     }
 
     private void recoverSession(FlinkResourceContext<FlinkDeployment> ctx) 
throws Exception {
-        ctx.getFlinkService().submitSessionCluster(ctx.getObserveConfig());
+        var conf = ctx.getObserveConfig();
+        // Owner references must be re-applied on the deploy config used for 
recovery.
+        // Without this, the recreated JobManager Deployment is created with no
+        // ownerReferences, which prevents JOSDK from linking it back to the
+        // FlinkDeployment via getSecondaryResource(), leading to an 
unrecoverable
+        // MISSING / AlreadyExists loop.
+        setOwnerReference(ctx.getResource(), conf);
+        ctx.getFlinkService().submitSessionCluster(conf);
         ctx.getResource()
                 .getStatus()
                 
.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
index ba159918..2a410e71 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
@@ -149,6 +149,53 @@ public class SessionReconcilerTest extends 
OperatorTestBase {
         Assertions.assertEquals(expectedOwnerReferences, or);
     }
 
+    /**
+     * Regression test: the recovery path (recoverSession) must populate
+     * KubernetesConfigOptions.JOB_MANAGER_OWNER_REFERENCE on the 
configuration used to recreate the
+     * JobManager Deployment. Otherwise the recreated Deployment has no 
ownerReferences and JOSDK
+     * cannot link it back to the FlinkDeployment via getSecondaryResource(), 
causing an
+     * unrecoverable MISSING / AlreadyExists loop.
+     */
+    @Test
+    public void testRecoverSessionSetsOwnerReference() throws Exception {
+        var capturedConfigs = new java.util.ArrayList<Configuration>();
+        flinkService =
+                new TestingFlinkService(kubernetesClient) {
+                    @Override
+                    public void submitSessionCluster(Configuration conf) 
throws Exception {
+                        capturedConfigs.add(new Configuration(conf));
+                        super.submitSessionCluster(conf);
+                    }
+                };
+
+        FlinkDeployment deployment = TestUtils.buildSessionCluster();
+        // Initial deploy goes through SessionReconciler.deploy() which 
already sets ownerRef.
+        reconciler.reconcile(deployment, flinkService.getContext());
+        assertEquals(1, capturedConfigs.size());
+
+        // Simulate the JM Deployment going missing (e.g. node failure / 
informer cache miss).
+        // shouldRecoverDeployment() requires HA to be enabled (it is by 
default for the test
+        // session cluster) and the JM status to be MISSING for a non-terminal 
deployment.
+        
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
+        capturedConfigs.clear();
+
+        reconciler.reconcile(deployment, flinkService.getContext());
+
+        assertEquals(
+                1,
+                capturedConfigs.size(),
+                "recoverSession should have invoked submitSessionCluster 
exactly once");
+        Configuration recoverConfig = capturedConfigs.get(0);
+        List<Map<String, String>> ownerRefs =
+                
recoverConfig.get(KubernetesConfigOptions.JOB_MANAGER_OWNER_REFERENCE);
+        Assertions.assertEquals(
+                List.of(TestUtils.generateTestOwnerReferenceMap(deployment)),
+                ownerRefs,
+                "recoverSession must populate JOB_MANAGER_OWNER_REFERENCE so 
the recreated"
+                        + " JobManager Deployment carries ownerReferences 
linking it to the"
+                        + " FlinkDeployment CR");
+    }
+
     @Test
     public void testGetNonTerminalJobs() throws Exception {
         FlinkDeployment deployment = TestUtils.buildSessionCluster();

Reply via email to