This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit db38c3de786d7b9a67cb1c6586b4a53c0f2bdd4f
Author: Gyula Fora <g_f...@apple.com>
AuthorDate: Mon Jun 27 11:33:09 2022 +0200

    [FLINK-27280] Unify stability checking for application/session jobs
---
 .../operator/observer/JobStatusObserver.java       |  2 ++
 .../deployment/AbstractDeploymentObserver.java     |  9 ++----
 .../observer/deployment/ApplicationObserver.java   | 32 +---------------------
 .../observer/deployment/SessionObserver.java       | 13 +++++----
 .../operator/reconciler/ReconciliationUtils.java   | 29 ++++++++++++++++++++
 .../sessionjob/SessionJobObserverTest.java         |  6 ++++
 6 files changed, 48 insertions(+), 43 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
index 5c31d88..4836d71 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
@@ -20,6 +20,7 @@ package org.apache.flink.kubernetes.operator.observer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.EventUtils;
@@ -79,6 +80,7 @@ public abstract class JobStatusObserver<CTX> {
             } else {
                 updateJobStatus(resource, targetJobStatusMessage.get(), 
deployedConfig);
             }
+            ReconciliationUtils.checkAndUpdateStableSpec(resource.getStatus());
             return true;
         } else {
             ifRunningMoveToReconciling(jobStatus, previousJobStatus);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
index 48fa2e0..f67bc53 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
@@ -95,11 +95,7 @@ public abstract class AbstractDeploymentObserver implements 
Observer<FlinkDeploy
         }
 
         if (isJmDeploymentReady(flinkApp)) {
-            if (observeFlinkCluster(flinkApp, context, observeConfig)) {
-                if (reconciliationStatus.getState() != 
ReconciliationState.ROLLED_BACK) {
-                    reconciliationStatus.markReconciledSpecAsStable();
-                }
-            }
+            observeFlinkCluster(flinkApp, context, observeConfig);
         }
 
         if (isJmDeploymentReady(flinkApp)) {
@@ -312,8 +308,7 @@ public abstract class AbstractDeploymentObserver implements 
Observer<FlinkDeploy
      * @param flinkApp the target flinkDeployment resource
      * @param context the context with which the operation is executed
      * @param deployedConfig config that is deployed on the Flink cluster
-     * @return true if cluster state is stable
      */
-    protected abstract boolean observeFlinkCluster(
+    protected abstract void observeFlinkCluster(
             FlinkDeployment flinkApp, Context context, Configuration 
deployedConfig);
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
index 27a3150..0164169 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
@@ -20,7 +20,6 @@ package 
org.apache.flink.kubernetes.operator.observer.deployment;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
@@ -36,9 +35,6 @@ import io.javaoperatorsdk.operator.api.reconciler.Context;
 import java.util.List;
 import java.util.Optional;
 
-import static org.apache.flink.api.common.JobStatus.FINISHED;
-import static org.apache.flink.api.common.JobStatus.RUNNING;
-
 /** The observer of {@link 
org.apache.flink.kubernetes.operator.config.Mode#APPLICATION} cluster. */
 public class ApplicationObserver extends AbstractDeploymentObserver {
 
@@ -72,7 +68,7 @@ public class ApplicationObserver extends 
AbstractDeploymentObserver {
     }
 
     @Override
-    protected boolean observeFlinkCluster(
+    protected void observeFlinkCluster(
             FlinkDeployment flinkApp, Context context, Configuration 
deployedConfig) {
 
         boolean jobFound =
@@ -83,31 +79,5 @@ public class ApplicationObserver extends 
AbstractDeploymentObserver {
         if (jobFound) {
             savepointObserver.observeSavepointStatus(flinkApp, deployedConfig);
         }
-        return isJobStable(flinkApp.getStatus());
-    }
-
-    private boolean isJobStable(FlinkDeploymentStatus deploymentStatus) {
-        var flinkJobStatus =
-                org.apache.flink.api.common.JobStatus.valueOf(
-                        deploymentStatus.getJobStatus().getState());
-
-        if (flinkJobStatus == RUNNING) {
-            // Running jobs are currently always marked stable
-            return true;
-        }
-
-        var reconciledJobState =
-                deploymentStatus
-                        .getReconciliationStatus()
-                        .deserializeLastReconciledSpec()
-                        .getJob()
-                        .getState();
-
-        if (reconciledJobState == JobState.RUNNING && flinkJobStatus == 
FINISHED) {
-            // If the job finished on its own, it's marked stable
-            return true;
-        }
-
-        return false;
     }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
index fc6f4ed..3debfd9 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
@@ -20,6 +20,7 @@ package 
org.apache.flink.kubernetes.operator.observer.deployment;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 
@@ -38,19 +39,21 @@ public class SessionObserver extends 
AbstractDeploymentObserver {
     }
 
     @Override
-    public boolean observeFlinkCluster(
-            FlinkDeployment flinkApp, Context context, Configuration 
deployedConfig) {
+    public void observeFlinkCluster(
+            FlinkDeployment deployment, Context context, Configuration 
deployedConfig) {
         // Check if session cluster can serve rest calls following our 
practice in JobObserver
         try {
             flinkService.listJobs(deployedConfig);
-            return true;
+            var rs = deployment.getStatus().getReconciliationStatus();
+            if (rs.getState() == ReconciliationState.DEPLOYED) {
+                rs.markReconciledSpecAsStable();
+            }
         } catch (Exception e) {
             logger.error("REST service in session cluster is bad now", e);
             if (e instanceof TimeoutException) {
                 // check for problems with the underlying deployment
-                observeJmDeployment(flinkApp, context, deployedConfig);
+                observeJmDeployment(deployment, context, deployedConfig);
             }
-            return false;
         }
     }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index ddbc79f..d35cd2a 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -52,6 +52,9 @@ import javax.annotation.Nullable;
 
 import java.util.Optional;
 
+import static org.apache.flink.api.common.JobStatus.FINISHED;
+import static org.apache.flink.api.common.JobStatus.RUNNING;
+
 /** Reconciliation utilities. */
 public class ReconciliationUtils {
 
@@ -364,4 +367,30 @@ public class ReconciliationUtils {
 
         return 
lastSpecWithMeta.f1.get("metadata").get("generation").asLong(-1L);
     }
+
+    public static void checkAndUpdateStableSpec(CommonStatus<?> status) {
+        var flinkJobStatus =
+                
org.apache.flink.api.common.JobStatus.valueOf(status.getJobStatus().getState());
+
+        if (status.getReconciliationStatus().getState() != 
ReconciliationState.DEPLOYED) {
+            return;
+        }
+
+        if (flinkJobStatus == RUNNING) {
+            // Running jobs are currently always marked stable
+            status.getReconciliationStatus().markReconciledSpecAsStable();
+            return;
+        }
+
+        var reconciledJobState =
+                status.getReconciliationStatus()
+                        .deserializeLastReconciledSpec()
+                        .getJob()
+                        .getState();
+
+        if (reconciledJobState == JobState.RUNNING && flinkJobStatus == 
FINISHED) {
+            // If the job finished on its own, it's marked stable
+            status.getReconciliationStatus().markReconciledSpecAsStable();
+        }
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
index a73e000..6e3a0f6 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
@@ -80,10 +80,16 @@ public class SessionJobObserverTest {
         Assertions.assertEquals(
                 JobStatus.RECONCILING.name(), 
sessionJob.getStatus().getJobStatus().getState());
 
+        var reconStatus = sessionJob.getStatus().getReconciliationStatus();
+        Assertions.assertNotEquals(
+                reconStatus.getLastReconciledSpec(), 
reconStatus.getLastStableSpec());
+
         // observe with ready context
         observer.observe(sessionJob, readyContext);
         Assertions.assertEquals(
                 JobStatus.RUNNING.name(), 
sessionJob.getStatus().getJobStatus().getState());
+        Assertions.assertEquals(
+                reconStatus.getLastReconciledSpec(), 
reconStatus.getLastStableSpec());
 
         flinkService.setPortReady(false);
         observer.observe(sessionJob, readyContext);

Reply via email to