This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 4bac805  [FLINK-26432] Cleanly separate validator, observer and 
reconciler modules
4bac805 is described below

commit 4bac8059ec5d3c68da19beec22f29c434562516e
Author: Gyula Fora <g_f...@apple.com>
AuthorDate: Mon Feb 28 06:17:11 2022 +0100

    [FLINK-26432] Cleanly separate validator, observer and reconciler modules
    
    closes #26
---
 .../flink/kubernetes/operator/FlinkOperator.java   |   3 +
 .../controller/FlinkDeploymentController.java      |  20 ++-
 .../operator/crd/status/FlinkDeploymentStatus.java |   6 +-
 .../JobManagerDeploymentStatus.java                |   4 +-
 .../operator/observer/JobStatusObserver.java       | 115 ---------------
 .../kubernetes/operator/observer/Observer.java     | 161 +++++++++++++++++++++
 .../operator/reconciler/BaseReconciler.java        |  74 +---------
 .../operator/reconciler/JobReconciler.java         |  27 +---
 .../operator/reconciler/SessionReconciler.java     |   7 -
 .../flink/kubernetes/operator/TestUtils.java       |  17 +++
 .../kubernetes/operator/TestingFlinkService.java   |   7 +-
 .../controller/FlinkDeploymentControllerTest.java  |  47 +++---
 .../operator/observer/JobStatusObserverTest.java   |  77 ----------
 .../kubernetes/operator/observer/ObserverTest.java | 135 +++++++++++++++++
 .../operator/reconciler/JobReconcilerTest.java     |   4 +-
 .../crds/flinkdeployments.flink.apache.org-v1.yml  |   7 +
 16 files changed, 394 insertions(+), 317 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 8fa72b9..fd9d60c 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -21,6 +21,7 @@ import 
org.apache.flink.kubernetes.operator.config.DefaultConfig;
 import org.apache.flink.kubernetes.operator.controller.FlinkControllerConfig;
 import 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
 import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
+import org.apache.flink.kubernetes.operator.observer.Observer;
 import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
 import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
@@ -55,6 +56,7 @@ public class FlinkOperator {
 
         FlinkService flinkService = new FlinkService(client);
 
+        Observer observer = new Observer(flinkService);
         JobReconciler jobReconciler = new JobReconciler(client, flinkService);
         SessionReconciler sessionReconciler = new SessionReconciler(client, 
flinkService);
 
@@ -66,6 +68,7 @@ public class FlinkOperator {
                         client,
                         namespace,
                         validator,
+                        observer,
                         jobReconciler,
                         sessionReconciler);
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index 5dff8f4..b9d5cd4 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
 import 
org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException;
 import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.kubernetes.operator.observer.Observer;
 import org.apache.flink.kubernetes.operator.reconciler.BaseReconciler;
 import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
 import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
@@ -64,6 +65,7 @@ public class FlinkDeploymentController
     private final String operatorNamespace;
 
     private final FlinkDeploymentValidator validator;
+    private final Observer observer;
     private final JobReconciler jobReconciler;
     private final SessionReconciler sessionReconciler;
     private final DefaultConfig defaultConfig;
@@ -73,12 +75,14 @@ public class FlinkDeploymentController
             KubernetesClient kubernetesClient,
             String operatorNamespace,
             FlinkDeploymentValidator validator,
+            Observer observer,
             JobReconciler jobReconciler,
             SessionReconciler sessionReconciler) {
         this.defaultConfig = defaultConfig;
         this.kubernetesClient = kubernetesClient;
         this.operatorNamespace = operatorNamespace;
         this.validator = validator;
+        this.observer = observer;
         this.jobReconciler = jobReconciler;
         this.sessionReconciler = sessionReconciler;
     }
@@ -86,12 +90,12 @@ public class FlinkDeploymentController
     @Override
     public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
         LOG.info("Stopping cluster {}", flinkApp.getMetadata().getName());
+        Configuration effectiveConfig =
+                FlinkUtils.getEffectiveConfig(flinkApp, 
defaultConfig.getFlinkConfig());
+
+        observer.observe(flinkApp, context, effectiveConfig);
         return getReconciler(flinkApp)
-                .shutdownAndDelete(
-                        operatorNamespace,
-                        flinkApp,
-                        context,
-                        FlinkUtils.getEffectiveConfig(flinkApp, 
defaultConfig.getFlinkConfig()));
+                .shutdownAndDelete(operatorNamespace, flinkApp, 
effectiveConfig);
     }
 
     @Override
@@ -107,6 +111,12 @@ public class FlinkDeploymentController
 
         Configuration effectiveConfig =
                 FlinkUtils.getEffectiveConfig(flinkApp, 
defaultConfig.getFlinkConfig());
+
+        boolean readyToReconcile = observer.observe(flinkApp, context, 
effectiveConfig);
+        if (!readyToReconcile) {
+            return 
flinkApp.getStatus().getJobManagerDeploymentStatus().toUpdateControl(flinkApp);
+        }
+
         try {
             UpdateControl<FlinkDeployment> updateControl =
                     getReconciler(flinkApp)
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
index 928c827..e192163 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/FlinkDeploymentStatus.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.kubernetes.operator.crd.status;
 
+import 
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
+
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
@@ -26,6 +28,8 @@ import lombok.NoArgsConstructor;
 @NoArgsConstructor
 @AllArgsConstructor
 public class FlinkDeploymentStatus {
-    private JobStatus jobStatus;
+    private JobStatus jobStatus = new JobStatus();
+    private JobManagerDeploymentStatus jobManagerDeploymentStatus =
+            JobManagerDeploymentStatus.MISSING;
     private ReconciliationStatus reconciliationStatus = new 
ReconciliationStatus();
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobManagerDeploymentStatus.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
similarity index 97%
rename from 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobManagerDeploymentStatus.java
rename to 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
index 5e634a9..08262ff 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobManagerDeploymentStatus.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.reconciler;
+package org.apache.flink.kubernetes.operator.observer;
 
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 
@@ -44,12 +44,12 @@ public enum JobManagerDeploymentStatus {
     public UpdateControl<FlinkDeployment> toUpdateControl(FlinkDeployment 
flinkDeployment) {
         switch (this) {
             case DEPLOYING:
+            case READY:
                 return UpdateControl.updateStatus(flinkDeployment)
                         .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
             case DEPLOYED_NOT_READY:
                 return UpdateControl.updateStatus(flinkDeployment)
                         .rescheduleAfter(PORT_READY_DELAY_SECONDS, 
TimeUnit.SECONDS);
-            case READY:
             case MISSING:
             default:
                 return null;
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
deleted file mode 100644
index 577d73b..0000000
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.kubernetes.operator.observer;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
-import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
-import org.apache.flink.kubernetes.operator.crd.spec.JobState;
-import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
-import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
-import org.apache.flink.kubernetes.operator.service.FlinkService;
-import org.apache.flink.runtime.client.JobStatusMessage;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-/** Observes the actual state of the running jobs on the Flink cluster. */
-public class JobStatusObserver {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(JobStatusObserver.class);
-
-    private final FlinkService flinkService;
-
-    public JobStatusObserver(FlinkService flinkService) {
-        this.flinkService = flinkService;
-    }
-
-    public boolean observeFlinkJobStatus(FlinkDeployment flinkApp, 
Configuration effectiveConfig)
-            throws Exception {
-        FlinkDeploymentSpec lastReconciledSpec =
-                
flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec();
-
-        if (lastReconciledSpec == null) {
-            // This is the first run, nothing to observe
-            return true;
-        }
-
-        JobSpec jobSpec = lastReconciledSpec.getJob();
-
-        if (jobSpec == null) {
-            // This is a session cluster, nothing to observe
-            return true;
-        }
-
-        if (!jobSpec.getState().equals(JobState.RUNNING)) {
-            // The job is not running, nothing to observe
-            return true;
-        }
-        LOG.info("Getting job statuses for {}", 
flinkApp.getMetadata().getName());
-        FlinkDeploymentStatus flinkAppStatus = flinkApp.getStatus();
-
-        Collection<JobStatusMessage> clusterJobStatuses = 
flinkService.listJobs(effectiveConfig);
-        if (clusterJobStatuses.isEmpty()) {
-            LOG.info("No jobs found on {} yet", 
flinkApp.getMetadata().getName());
-            return false;
-        } else {
-            flinkAppStatus.setJobStatus(
-                    mergeJobStatus(
-                            flinkAppStatus.getJobStatus(), new 
ArrayList<>(clusterJobStatuses)));
-            LOG.info("Job statuses updated for {}", 
flinkApp.getMetadata().getName());
-            return true;
-        }
-    }
-
-    /** Merge previous job status with the new one from the flink job cluster. 
*/
-    private JobStatus mergeJobStatus(
-            JobStatus oldStatus, List<JobStatusMessage> clusterJobStatuses) {
-        JobStatus newStatus = oldStatus;
-        Collections.sort(
-                clusterJobStatuses,
-                (j1, j2) -> -1 * Long.compare(j1.getStartTime(), 
j2.getStartTime()));
-        JobStatusMessage newJob = clusterJobStatuses.get(0);
-
-        if (newStatus == null) {
-            newStatus = createJobStatus(newJob);
-        } else {
-            newStatus.setState(newJob.getJobState().name());
-            newStatus.setJobName(newJob.getJobName());
-            newStatus.setJobId(newJob.getJobId().toHexString());
-            // track the start time, changing timestamp would cause busy 
reconciliation
-            newStatus.setUpdateTime(String.valueOf(newJob.getStartTime()));
-        }
-        return newStatus;
-    }
-
-    public static JobStatus createJobStatus(JobStatusMessage message) {
-        JobStatus jobStatus = new JobStatus();
-        jobStatus.setJobId(message.getJobId().toHexString());
-        jobStatus.setJobName(message.getJobName());
-        jobStatus.setState(message.getJobState().name());
-        jobStatus.setUpdateTime(String.valueOf(message.getStartTime()));
-        return jobStatus;
-    }
-}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
new file mode 100644
index 0000000..2772fbb
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.runtime.client.JobStatusMessage;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/** Observes the actual state of the running jobs on the Flink cluster. */
+public class Observer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Observer.class);
+
+    private final FlinkService flinkService;
+
+    public Observer(FlinkService flinkService) {
+        this.flinkService = flinkService;
+    }
+
+    public boolean observe(
+            FlinkDeployment flinkApp, Context context, Configuration 
effectiveConfig) {
+        observeJmDeployment(flinkApp, context, effectiveConfig);
+        return isReadyToReconcile(flinkApp, effectiveConfig);
+    }
+
+    private void observeJmDeployment(
+            FlinkDeployment flinkApp, Context context, Configuration 
effectiveConfig) {
+        FlinkDeploymentStatus deploymentStatus = flinkApp.getStatus();
+        JobManagerDeploymentStatus previousJmStatus =
+                deploymentStatus.getJobManagerDeploymentStatus();
+
+        if (JobManagerDeploymentStatus.READY == previousJmStatus) {
+            return;
+        }
+
+        if (JobManagerDeploymentStatus.DEPLOYED_NOT_READY == previousJmStatus) 
{
+            
deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+            return;
+        }
+
+        Optional<Deployment> deployment = 
context.getSecondaryResource(Deployment.class);
+        if (deployment.isPresent()) {
+            DeploymentStatus status = deployment.get().getStatus();
+            DeploymentSpec spec = deployment.get().getSpec();
+            if (status != null
+                    && status.getAvailableReplicas() != null
+                    && spec.getReplicas().intValue() == status.getReplicas()
+                    && spec.getReplicas().intValue() == 
status.getAvailableReplicas()
+                    && flinkService.isJobManagerPortReady(effectiveConfig)) {
+
+                // typically it takes a few seconds for the REST server to be 
ready
+                LOG.info(
+                        "JobManager deployment {} in namespace {} port ready, 
waiting for the REST API...",
+                        flinkApp.getMetadata().getName(),
+                        flinkApp.getMetadata().getNamespace());
+                deploymentStatus.setJobManagerDeploymentStatus(
+                        JobManagerDeploymentStatus.DEPLOYED_NOT_READY);
+                return;
+            }
+            LOG.info(
+                    "JobManager deployment {} in namespace {} exists but not 
ready yet, status {}",
+                    flinkApp.getMetadata().getName(),
+                    flinkApp.getMetadata().getNamespace(),
+                    status);
+
+            
deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
+            return;
+        }
+
+        
deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
+    }
+
+    private boolean observeFlinkJobStatus(FlinkDeployment flinkApp, 
Configuration effectiveConfig) {
+
+        // No need to observe job status for session clusters
+        if (flinkApp.getSpec().getJob() == null) {
+            return true;
+        }
+
+        LOG.info("Getting job statuses for {}", 
flinkApp.getMetadata().getName());
+        FlinkDeploymentStatus flinkAppStatus = flinkApp.getStatus();
+
+        Collection<JobStatusMessage> clusterJobStatuses;
+        try {
+            clusterJobStatuses = flinkService.listJobs(effectiveConfig);
+        } catch (Exception e) {
+            LOG.error("Exception while listing jobs", e);
+            flinkAppStatus.getJobStatus().setState("UNKNOWN");
+            return false;
+        }
+        if (clusterJobStatuses.isEmpty()) {
+            LOG.info("No jobs found on {} yet", 
flinkApp.getMetadata().getName());
+            return false;
+        } else {
+            updateJobStatus(flinkAppStatus.getJobStatus(), new 
ArrayList<>(clusterJobStatuses));
+            LOG.info("Job statuses updated for {}", 
flinkApp.getMetadata().getName());
+            return true;
+        }
+    }
+
+    private boolean isReadyToReconcile(FlinkDeployment flinkApp, Configuration 
effectiveConfig) {
+        JobManagerDeploymentStatus jmDeploymentStatus =
+                flinkApp.getStatus().getJobManagerDeploymentStatus();
+
+        switch (jmDeploymentStatus) {
+            case READY:
+                return observeFlinkJobStatus(flinkApp, effectiveConfig);
+            case MISSING:
+                return true;
+            case DEPLOYING:
+            case DEPLOYED_NOT_READY:
+                return false;
+            default:
+                throw new RuntimeException("Unknown status: " + 
jmDeploymentStatus);
+        }
+    }
+
+    /** Update previous job status based on the job list from the cluster. */
+    private void updateJobStatus(JobStatus status, List<JobStatusMessage> 
clusterJobStatuses) {
+        Collections.sort(
+                clusterJobStatuses, (j1, j2) -> 
Long.compare(j2.getStartTime(), j1.getStartTime()));
+        JobStatusMessage newJob = clusterJobStatuses.get(0);
+
+        status.setState(newJob.getJobState().name());
+        status.setJobName(newJob.getJobName());
+        status.setJobId(newJob.getJobId().toHexString());
+        // track the start time, changing timestamp would cause busy 
reconciliation
+        status.setUpdateTime(String.valueOf(newJob.getStartTime()));
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
index d6cbb29..b94956b 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
@@ -19,13 +19,11 @@ package org.apache.flink.kubernetes.operator.reconciler;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import 
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 
-import io.fabric8.kubernetes.api.model.apps.Deployment;
-import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
-import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
@@ -33,9 +31,6 @@ import 
io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashSet;
-import java.util.Optional;
-
 /** BaseReconciler with functionality that is common to job and session modes. 
*/
 public abstract class BaseReconciler {
 
@@ -44,8 +39,6 @@ public abstract class BaseReconciler {
     public static final int REFRESH_SECONDS = 60;
     public static final int PORT_READY_DELAY_SECONDS = 10;
 
-    private final HashSet<String> jobManagerDeployments = new HashSet<>();
-
     protected final KubernetesClient kubernetesClient;
     protected final FlinkService flinkService;
 
@@ -54,10 +47,6 @@ public abstract class BaseReconciler {
         this.flinkService = flinkService;
     }
 
-    public boolean removeDeployment(FlinkDeployment flinkApp) {
-        return jobManagerDeployments.remove(flinkApp.getMetadata().getUid());
-    }
-
     public abstract UpdateControl<FlinkDeployment> reconcile(
             String operatorNamespace,
             FlinkDeployment flinkApp,
@@ -65,70 +54,15 @@ public abstract class BaseReconciler {
             Configuration effectiveConfig)
             throws Exception;
 
-    protected JobManagerDeploymentStatus checkJobManagerDeployment(
-            FlinkDeployment flinkApp, Context context, Configuration 
effectiveConfig) {
-        if (!jobManagerDeployments.contains(flinkApp.getMetadata().getUid())) {
-            Optional<Deployment> deployment = 
context.getSecondaryResource(Deployment.class);
-            if (deployment.isPresent()) {
-                DeploymentStatus status = deployment.get().getStatus();
-                DeploymentSpec spec = deployment.get().getSpec();
-                if (status != null
-                        && status.getAvailableReplicas() != null
-                        && spec.getReplicas().intValue() == 
status.getReplicas()
-                        && spec.getReplicas().intValue() == 
status.getAvailableReplicas()) {
-                    // typically it takes a few seconds for the REST server to 
be ready
-                    if (flinkService.isJobManagerPortReady(effectiveConfig)) {
-                        LOG.info(
-                                "JobManager deployment {} in namespace {} is 
ready",
-                                flinkApp.getMetadata().getName(),
-                                flinkApp.getMetadata().getNamespace());
-                        
jobManagerDeployments.add(flinkApp.getMetadata().getUid());
-                        if (flinkApp.getStatus().getJobStatus() != null) {
-                            // pre-existing deployments on operator restart - 
proceed with
-                            // reconciliation
-                            return JobManagerDeploymentStatus.READY;
-                        }
-                    }
-                    LOG.info(
-                            "JobManager deployment {} in namespace {} port not 
ready",
-                            flinkApp.getMetadata().getName(),
-                            flinkApp.getMetadata().getNamespace());
-                    return JobManagerDeploymentStatus.DEPLOYED_NOT_READY;
-                }
-                LOG.info(
-                        "JobManager deployment {} in namespace {} not yet 
ready, status {}",
-                        flinkApp.getMetadata().getName(),
-                        flinkApp.getMetadata().getNamespace(),
-                        status);
-
-                return JobManagerDeploymentStatus.DEPLOYING;
-            }
-            return JobManagerDeploymentStatus.MISSING;
-        }
-        return JobManagerDeploymentStatus.READY;
-    }
-
-    /**
-     * Shuts down the job and deletes all kubernetes resources including k8s 
HA resources. It will
-     * first perform a graceful shutdown (cancel) before deleting the data if 
that is unsuccessful
-     * in order to avoid leaking HA metadata to durable storage.
-     *
-     * <p>This feature is limited at the moment to cleaning up native 
kubernetes HA resources, other
-     * HA providers like ZK need to be cleaned up manually after deletion.
-     */
     public DeleteControl shutdownAndDelete(
-            String operatorNamespace,
-            FlinkDeployment flinkApp,
-            Context context,
-            Configuration effectiveConfig) {
+            String operatorNamespace, FlinkDeployment flinkApp, Configuration 
effectiveConfig) {
 
-        if (checkJobManagerDeployment(flinkApp, context, effectiveConfig)
-                == JobManagerDeploymentStatus.READY) {
+        if (JobManagerDeploymentStatus.READY
+                == flinkApp.getStatus().getJobManagerDeploymentStatus()) {
             shutdown(flinkApp, effectiveConfig);
         } else {
             FlinkUtils.deleteCluster(flinkApp, kubernetesClient, true);
         }
-        removeDeployment(flinkApp);
         IngressUtils.updateIngressRules(
                 flinkApp, effectiveConfig, operatorNamespace, 
kubernetesClient, true);
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
index ee121e9..3abb860 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
@@ -25,7 +25,7 @@ import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
-import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
+import 
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
@@ -48,11 +48,8 @@ public class JobReconciler extends BaseReconciler {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(JobReconciler.class);
 
-    private final JobStatusObserver observer;
-
     public JobReconciler(KubernetesClient kubernetesClient, FlinkService 
flinkService) {
         super(kubernetesClient, flinkService);
-        this.observer = new JobStatusObserver(flinkService);
     }
 
     @Override
@@ -72,19 +69,7 @@ public class JobReconciler extends BaseReconciler {
                     Optional.ofNullable(jobSpec.getInitialSavepointPath()));
             IngressUtils.updateIngressRules(
                     flinkApp, effectiveConfig, operatorNamespace, 
kubernetesClient, false);
-        }
-
-        // wait until the deployment is ready
-        UpdateControl<FlinkDeployment> uc =
-                checkJobManagerDeployment(flinkApp, context, effectiveConfig)
-                        .toUpdateControl(flinkApp);
-        if (uc != null) {
-            return uc;
-        }
-
-        if (!observer.observeFlinkJobStatus(flinkApp, effectiveConfig)) {
-            return UpdateControl.updateStatus(flinkApp)
-                    .rescheduleAfter(REFRESH_SECONDS, TimeUnit.SECONDS);
+            return 
JobManagerDeploymentStatus.DEPLOYING.toUpdateControl(flinkApp);
         }
 
         // TODO: following assumes that current job is running
@@ -174,15 +159,17 @@ public class JobReconciler extends BaseReconciler {
                         effectiveConfig);
         JobStatus jobStatus = flinkApp.getStatus().getJobStatus();
         jobStatus.setState("suspended");
-        removeDeployment(flinkApp);
+        
flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
         savepointOpt.ifPresent(jobStatus::setSavepointLocation);
         return savepointOpt;
     }
 
     @Override
     protected void shutdown(FlinkDeployment flinkApp, Configuration 
effectiveConfig) {
-        if (flinkApp.getStatus().getJobStatus() != null
-                && flinkApp.getStatus().getJobStatus().getJobId() != null) {
+        if (org.apache.flink.api.common.JobStatus.RUNNING
+                .name()
+                
.equalsIgnoreCase(flinkApp.getStatus().getJobStatus().getState())) {
+            LOG.info("Job is running, attempting graceful shutdown.");
             try {
                 flinkService.cancelJob(
                         
JobID.fromHexString(flinkApp.getStatus().getJobStatus().getJobId()),
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
index e72de56..d63fdeb 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
@@ -60,13 +60,6 @@ public class SessionReconciler extends BaseReconciler {
                     flinkApp, effectiveConfig, operatorNamespace, 
kubernetesClient, false);
         }
 
-        UpdateControl<FlinkDeployment> uc =
-                checkJobManagerDeployment(flinkApp, context, effectiveConfig)
-                        .toUpdateControl(flinkApp);
-        if (uc != null) {
-            return uc;
-        }
-
         boolean specChanged = !flinkApp.getSpec().equals(lastReconciledSpec);
         if (specChanged) {
             upgradeSessionCluster(flinkApp, effectiveConfig);
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index 9ada6f8..055d91b 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -33,9 +33,12 @@ import io.fabric8.kubernetes.api.model.Container;
 import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
 import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.PodSpec;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 
 /** Testing utilities. */
 public class TestUtils {
@@ -109,4 +112,18 @@ public class TestUtils {
         pod.setSpec(podSpec);
         return pod;
     }
+
+    public static Context createEmptyContext() {
+        return new Context() {
+            @Override
+            public Optional<RetryInfo> getRetryInfo() {
+                return Optional.empty();
+            }
+
+            @Override
+            public <T> Optional<T> getSecondaryResource(Class<T> aClass, 
String s) {
+                return Optional.empty();
+            }
+        };
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index 610176b..2aab5ab 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -41,6 +41,7 @@ public class TestingFlinkService extends FlinkService {
 
     private List<Tuple2<String, JobStatusMessage>> jobs = new ArrayList<>();
     private Set<String> sessions = new HashSet<>();
+    private boolean isPortReady = true;
 
     public TestingFlinkService() {
         super(null);
@@ -104,6 +105,10 @@ public class TestingFlinkService extends FlinkService {
 
     @Override
     public boolean isJobManagerPortReady(Configuration config) {
-        return true;
+        return isPortReady;
+    }
+
+    public void setPortReady(boolean isPortReady) {
+        this.isPortReady = isPortReady;
     }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index ee7a386..f4468c6 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -25,8 +25,8 @@ import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
-import org.apache.flink.kubernetes.operator.reconciler.BaseReconciler;
-import 
org.apache.flink.kubernetes.operator.reconciler.JobManagerDeploymentStatus;
+import 
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.observer.Observer;
 import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
 import org.apache.flink.kubernetes.operator.reconciler.JobReconcilerTest;
 import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
@@ -60,20 +60,12 @@ public class FlinkDeploymentControllerTest {
 
         UpdateControl<FlinkDeployment> updateControl;
 
-        updateControl = testController.reconcile(appCluster, context);
+        updateControl = testController.reconcile(appCluster, 
TestUtils.createEmptyContext());
         assertTrue(updateControl.isUpdateStatus());
         assertEquals(
-                JobManagerDeploymentStatus.DEPLOYED_NOT_READY
-                        .toUpdateControl(appCluster)
-                        .getScheduleDelay(),
+                
JobManagerDeploymentStatus.DEPLOYING.toUpdateControl(appCluster).getScheduleDelay(),
                 updateControl.getScheduleDelay());
 
-        updateControl = testController.reconcile(appCluster, context);
-        assertTrue(updateControl.isUpdateStatus());
-        assertEquals(
-                BaseReconciler.REFRESH_SECONDS * 1000,
-                (long) updateControl.getScheduleDelay().get());
-
         // Validate reconciliation status
         ReconciliationStatus reconciliationStatus =
                 appCluster.getStatus().getReconciliationStatus();
@@ -84,8 +76,16 @@ public class FlinkDeploymentControllerTest {
         updateControl = testController.reconcile(appCluster, context);
         assertTrue(updateControl.isUpdateStatus());
         assertEquals(
-                BaseReconciler.REFRESH_SECONDS * 1000,
-                (long) updateControl.getScheduleDelay().get());
+                JobManagerDeploymentStatus.DEPLOYED_NOT_READY
+                        .toUpdateControl(appCluster)
+                        .getScheduleDelay(),
+                updateControl.getScheduleDelay());
+
+        updateControl = testController.reconcile(appCluster, context);
+        assertTrue(updateControl.isUpdateStatus());
+        assertEquals(
+                
JobManagerDeploymentStatus.READY.toUpdateControl(appCluster).getScheduleDelay(),
+                updateControl.getScheduleDelay());
 
         // Validate job status
         JobStatus jobStatus = appCluster.getStatus().getJobStatus();
@@ -122,7 +122,7 @@ public class FlinkDeploymentControllerTest {
         appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
         appCluster.getSpec().getJob().setInitialSavepointPath("s0");
 
-        testController.reconcile(appCluster, context);
+        testController.reconcile(appCluster, TestUtils.createEmptyContext());
         List<Tuple2<String, JobStatusMessage>> jobs = flinkService.listJobs();
         assertEquals(1, jobs.size());
         assertEquals("s0", jobs.get(0).f0);
@@ -143,16 +143,20 @@ public class FlinkDeploymentControllerTest {
         jobs = flinkService.listJobs();
         assertEquals(1, jobs.size());
         assertEquals("savepoint_0", jobs.get(0).f0);
+        testController.reconcile(appCluster, context);
 
         // Suspend job
         appCluster = TestUtils.clone(appCluster);
         appCluster.getSpec().getJob().setState(JobState.SUSPENDED);
         testController.reconcile(appCluster, context);
+        assertEquals(
+                JobManagerDeploymentStatus.MISSING,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
 
         // Resume from last savepoint
         appCluster = TestUtils.clone(appCluster);
         appCluster.getSpec().getJob().setState(JobState.RUNNING);
-        testController.reconcile(appCluster, context);
+        testController.reconcile(appCluster, TestUtils.createEmptyContext());
         jobs = flinkService.listJobs();
         assertEquals(1, jobs.size());
         assertEquals("savepoint_1", jobs.get(0).f0);
@@ -171,7 +175,7 @@ public class FlinkDeploymentControllerTest {
         appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
         appCluster.getSpec().getJob().setInitialSavepointPath("s0");
 
-        testController.reconcile(appCluster, context);
+        testController.reconcile(appCluster, TestUtils.createEmptyContext());
         List<Tuple2<String, JobStatusMessage>> jobs = flinkService.listJobs();
         assertEquals(1, jobs.size());
         assertEquals("s0", jobs.get(0).f0);
@@ -180,6 +184,13 @@ public class FlinkDeploymentControllerTest {
         appCluster = TestUtils.clone(appCluster);
         appCluster.getSpec().getJob().setParallelism(100);
 
+        UpdateControl<FlinkDeployment> updateControl =
+                testController.reconcile(appCluster, context);
+        assertEquals(
+                JobManagerDeploymentStatus.DEPLOYED_NOT_READY
+                        .toUpdateControl(appCluster)
+                        .getScheduleDelay(),
+                updateControl.getScheduleDelay());
         testController.reconcile(appCluster, context);
         jobs = flinkService.listJobs();
         assertEquals(1, jobs.size());
@@ -200,6 +211,7 @@ public class FlinkDeploymentControllerTest {
     }
 
     private FlinkDeploymentController createTestController(TestingFlinkService 
flinkService) {
+        Observer observer = new Observer(flinkService);
         JobReconciler jobReconciler = new JobReconciler(null, flinkService);
         SessionReconciler sessionReconciler = new SessionReconciler(null, 
flinkService);
 
@@ -208,6 +220,7 @@ public class FlinkDeploymentControllerTest {
                 null,
                 "test",
                 new DefaultDeploymentValidator(),
+                observer,
                 jobReconciler,
                 sessionReconciler);
     }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
deleted file mode 100644
index 769af93..0000000
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.kubernetes.operator.observer;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.kubernetes.operator.TestUtils;
-import org.apache.flink.kubernetes.operator.TestingFlinkService;
-import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.crd.spec.JobState;
-import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
-import org.apache.flink.kubernetes.operator.service.FlinkService;
-import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
-
-import org.junit.jupiter.api.Test;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-/** @link JobStatusObserver unit tests */
-public class JobStatusObserverTest {
-
-    @Test
-    public void observeSessionCluster() throws Exception {
-        FlinkService flinkService = new TestingFlinkService();
-        JobStatusObserver observer = new JobStatusObserver(flinkService);
-        FlinkDeployment deployment = TestUtils.buildSessionCluster();
-        deployment
-                .getStatus()
-                .getReconciliationStatus()
-                .setLastReconciledSpec(deployment.getSpec());
-        assertTrue(
-                observer.observeFlinkJobStatus(
-                        deployment,
-                        FlinkUtils.getEffectiveConfig(deployment, new 
Configuration())));
-    }
-
-    @Test
-    public void observeApplicationCluster() throws Exception {
-        TestingFlinkService flinkService = new TestingFlinkService();
-        JobStatusObserver observer = new JobStatusObserver(flinkService);
-        FlinkDeployment deployment = TestUtils.buildApplicationCluster();
-        Configuration conf = FlinkUtils.getEffectiveConfig(deployment, new 
Configuration());
-
-        assertTrue(observer.observeFlinkJobStatus(deployment, conf));
-        deployment.setStatus(new FlinkDeploymentStatus());
-        deployment
-                .getStatus()
-                .getReconciliationStatus()
-                .setLastReconciledSpec(deployment.getSpec());
-
-        flinkService.submitApplicationCluster(deployment, conf);
-        assertTrue(observer.observeFlinkJobStatus(deployment, conf));
-
-        assertEquals(
-                deployment.getMetadata().getName(),
-                deployment.getStatus().getJobStatus().getJobName());
-
-        deployment.getSpec().getJob().setState(JobState.SUSPENDED);
-        flinkService.clear();
-        assertTrue(observer.observeFlinkJobStatus(deployment, conf));
-    }
-}
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java
new file mode 100644
index 0000000..4e7615c
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.observer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.TestingFlinkService;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.reconciler.JobReconcilerTest;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** @link Observer unit tests */
+public class ObserverTest {
+
+    private final Context readyContext =
+            JobReconcilerTest.createContextWithReadyJobManagerDeployment();
+
+    @Test
+    public void observeSessionCluster() {
+        FlinkService flinkService = new TestingFlinkService();
+        Observer observer = new Observer(flinkService);
+        FlinkDeployment deployment = TestUtils.buildSessionCluster();
+        deployment
+                .getStatus()
+                .getReconciliationStatus()
+                .setLastReconciledSpec(deployment.getSpec());
+
+        assertFalse(
+                observer.observe(
+                        deployment,
+                        readyContext,
+                        FlinkUtils.getEffectiveConfig(deployment, new 
Configuration())));
+
+        assertEquals(
+                JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
+                deployment.getStatus().getJobManagerDeploymentStatus());
+
+        assertTrue(
+                observer.observe(
+                        deployment,
+                        readyContext,
+                        FlinkUtils.getEffectiveConfig(deployment, new 
Configuration())));
+
+        assertEquals(
+                JobManagerDeploymentStatus.READY,
+                deployment.getStatus().getJobManagerDeploymentStatus());
+    }
+
+    @Test
+    public void observeApplicationCluster() {
+        TestingFlinkService flinkService = new TestingFlinkService();
+        Observer observer = new Observer(flinkService);
+        FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+        Configuration conf = FlinkUtils.getEffectiveConfig(deployment, new 
Configuration());
+
+        assertTrue(observer.observe(deployment, 
TestUtils.createEmptyContext(), conf));
+
+        deployment.setStatus(new FlinkDeploymentStatus());
+        deployment
+                .getStatus()
+                .getReconciliationStatus()
+                .setLastReconciledSpec(deployment.getSpec());
+        deployment.getStatus().setJobStatus(new JobStatus());
+        flinkService.submitApplicationCluster(deployment, conf);
+
+        // Validate port check logic
+        flinkService.setPortReady(false);
+
+        // Port not ready
+        assertFalse(observer.observe(deployment, readyContext, conf));
+        assertEquals(
+                JobManagerDeploymentStatus.DEPLOYING,
+                deployment.getStatus().getJobManagerDeploymentStatus());
+
+        assertFalse(observer.observe(deployment, readyContext, conf));
+        assertEquals(
+                JobManagerDeploymentStatus.DEPLOYING,
+                deployment.getStatus().getJobManagerDeploymentStatus());
+
+        flinkService.setPortReady(true);
+        // Port ready but we have to recheck once again
+        assertFalse(observer.observe(deployment, readyContext, conf));
+        assertEquals(
+                JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
+                deployment.getStatus().getJobManagerDeploymentStatus());
+
+        // Stable ready
+        assertTrue(observer.observe(deployment, readyContext, conf));
+        assertEquals(
+                JobManagerDeploymentStatus.READY,
+                deployment.getStatus().getJobManagerDeploymentStatus());
+
+        assertTrue(observer.observe(deployment, readyContext, conf));
+        assertEquals(
+                JobManagerDeploymentStatus.READY,
+                deployment.getStatus().getJobManagerDeploymentStatus());
+
+        assertEquals(
+                deployment.getMetadata().getName(),
+                deployment.getStatus().getJobStatus().getJobName());
+
+        // Test listing failure
+        flinkService.clear();
+        assertFalse(observer.observe(deployment, readyContext, conf));
+        assertEquals(
+                JobManagerDeploymentStatus.READY,
+                deployment.getStatus().getJobManagerDeploymentStatus());
+        assertEquals("UNKNOWN", 
deployment.getStatus().getJobStatus().getState());
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
index 6a2109b..2ee8793 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
@@ -25,7 +25,7 @@ import 
org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
-import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
+import org.apache.flink.kubernetes.operator.observer.Observer;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
@@ -115,7 +115,7 @@ public class JobReconcilerTest {
         final String expectedSavepointPath = "savepoint_0";
         final Context context = 
JobReconcilerTest.createContextWithReadyJobManagerDeployment();
         final TestingFlinkService flinkService = new TestingFlinkService();
-        JobStatusObserver observer = new JobStatusObserver(flinkService);
+        Observer observer = new Observer(flinkService);
 
         final JobReconciler reconciler = new JobReconciler(null, flinkService);
         final FlinkDeployment deployment = TestUtils.buildApplicationCluster();
diff --git a/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml 
b/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
index 5ae554c..ec9ee63 100644
--- a/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
+++ b/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
@@ -9077,6 +9077,13 @@ spec:
                   savepointLocation:
                     type: string
                 type: object
+              jobManagerDeploymentStatus:
+                enum:
+                - READY
+                - DEPLOYED_NOT_READY
+                - DEPLOYING
+                - MISSING
+                type: string
               reconciliationStatus:
                 properties:
                   success:

Reply via email to