This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new e3e0cbe4 [FLINK-39559] Set ownerReferences on JM Deployment recreated
during session recovery
e3e0cbe4 is described below
commit e3e0cbe42f998248c11edc88628d4da4e63046d6
Author: Krzysztof Adrian Palka <[email protected]>
AuthorDate: Tue Apr 28 16:42:59 2026 +0200
[FLINK-39559] Set ownerReferences on JM Deployment recreated during session
recovery
---
.../reconciler/deployment/SessionReconciler.java | 9 ++++-
.../deployment/SessionReconcilerTest.java | 47 ++++++++++++++++++++++
2 files changed, 55 insertions(+), 1 deletion(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
index d7ad7492..c38afce0 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
@@ -121,7 +121,14 @@ public class SessionReconciler
}
private void recoverSession(FlinkResourceContext<FlinkDeployment> ctx)
throws Exception {
- ctx.getFlinkService().submitSessionCluster(ctx.getObserveConfig());
+ var conf = ctx.getObserveConfig();
+ // Owner references must be re-applied on the deploy config used for
recovery.
+ // Without this, the recreated JobManager Deployment is created with no
+ // ownerReferences, which prevents JOSDK from linking it back to the
+ // FlinkDeployment via getSecondaryResource(), leading to an
unrecoverable
+ // MISSING / AlreadyExists loop.
+ setOwnerReference(ctx.getResource(), conf);
+ ctx.getFlinkService().submitSessionCluster(conf);
ctx.getResource()
.getStatus()
.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
index ba159918..2a410e71 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
@@ -149,6 +149,53 @@ public class SessionReconcilerTest extends
OperatorTestBase {
Assertions.assertEquals(expectedOwnerReferences, or);
}
+ /**
+ * Regression test: the recovery path (recoverSession) must populate
+ * KubernetesConfigOptions.JOB_MANAGER_OWNER_REFERENCE on the
configuration used to recreate the
+ * JobManager Deployment. Otherwise the recreated Deployment has no
ownerReferences and JOSDK
+ * cannot link it back to the FlinkDeployment via getSecondaryResource(),
causing an
+ * unrecoverable MISSING / AlreadyExists loop.
+ */
+ @Test
+ public void testRecoverSessionSetsOwnerReference() throws Exception {
+ var capturedConfigs = new java.util.ArrayList<Configuration>();
+ flinkService =
+ new TestingFlinkService(kubernetesClient) {
+ @Override
+ public void submitSessionCluster(Configuration conf)
throws Exception {
+ capturedConfigs.add(new Configuration(conf));
+ super.submitSessionCluster(conf);
+ }
+ };
+
+ FlinkDeployment deployment = TestUtils.buildSessionCluster();
+ // Initial deploy goes through SessionReconciler.deploy() which
already sets ownerRef.
+ reconciler.reconcile(deployment, flinkService.getContext());
+ assertEquals(1, capturedConfigs.size());
+
+ // Simulate the JM Deployment going missing (e.g. node failure /
informer cache miss).
+ // shouldRecoverDeployment() requires HA to be enabled (it is by
default for the test
+ // session cluster) and the JM status to be MISSING for a non-terminal
deployment.
+
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
+ capturedConfigs.clear();
+
+ reconciler.reconcile(deployment, flinkService.getContext());
+
+ assertEquals(
+ 1,
+ capturedConfigs.size(),
+ "recoverSession should have invoked submitSessionCluster
exactly once");
+ Configuration recoverConfig = capturedConfigs.get(0);
+ List<Map<String, String>> ownerRefs =
+
recoverConfig.get(KubernetesConfigOptions.JOB_MANAGER_OWNER_REFERENCE);
+ Assertions.assertEquals(
+ List.of(TestUtils.generateTestOwnerReferenceMap(deployment)),
+ ownerRefs,
+ "recoverSession must populate JOB_MANAGER_OWNER_REFERENCE so
the recreated"
+ + " JobManager Deployment carries ownerReferences
linking it to the"
+ + " FlinkDeployment CR");
+ }
+
@Test
public void testGetNonTerminalJobs() throws Exception {
FlinkDeployment deployment = TestUtils.buildSessionCluster();