This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit db38c3de786d7b9a67cb1c6586b4a53c0f2bdd4f Author: Gyula Fora <g_f...@apple.com> AuthorDate: Mon Jun 27 11:33:09 2022 +0200 [FLINK-27280] Unify stability checking for application/session jobs --- .../operator/observer/JobStatusObserver.java | 2 ++ .../deployment/AbstractDeploymentObserver.java | 9 ++---- .../observer/deployment/ApplicationObserver.java | 32 +--------------------- .../observer/deployment/SessionObserver.java | 13 +++++---- .../operator/reconciler/ReconciliationUtils.java | 29 ++++++++++++++++++++ .../sessionjob/SessionJobObserverTest.java | 6 ++++ 6 files changed, 48 insertions(+), 43 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java index 5c31d88..4836d71 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java @@ -20,6 +20,7 @@ package org.apache.flink.kubernetes.operator.observer; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource; import org.apache.flink.kubernetes.operator.crd.status.JobStatus; +import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.service.FlinkService; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.kubernetes.operator.utils.EventUtils; @@ -79,6 +80,7 @@ public abstract class JobStatusObserver<CTX> { } else { updateJobStatus(resource, targetJobStatusMessage.get(), deployedConfig); } + ReconciliationUtils.checkAndUpdateStableSpec(resource.getStatus()); return true; } else { ifRunningMoveToReconciling(jobStatus, previousJobStatus); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java index 48fa2e0..f67bc53 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java @@ -95,11 +95,7 @@ public abstract class AbstractDeploymentObserver implements Observer<FlinkDeploy } if (isJmDeploymentReady(flinkApp)) { - if (observeFlinkCluster(flinkApp, context, observeConfig)) { - if (reconciliationStatus.getState() != ReconciliationState.ROLLED_BACK) { - reconciliationStatus.markReconciledSpecAsStable(); - } - } + observeFlinkCluster(flinkApp, context, observeConfig); } if (isJmDeploymentReady(flinkApp)) { @@ -312,8 +308,7 @@ public abstract class AbstractDeploymentObserver implements Observer<FlinkDeploy * @param flinkApp the target flinkDeployment resource * @param context the context with which the operation is executed * @param deployedConfig config that is deployed on the Flink cluster - * @return true if cluster state is stable */ - protected abstract boolean observeFlinkCluster( + protected abstract void observeFlinkCluster( FlinkDeployment flinkApp, Context context, Configuration deployedConfig); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java index 27a3150..0164169 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java @@ -20,7 +20,6 @@ package org.apache.flink.kubernetes.operator.observer.deployment; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; -import org.apache.flink.kubernetes.operator.crd.spec.JobState; import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.crd.status.JobStatus; import org.apache.flink.kubernetes.operator.observer.JobStatusObserver; @@ -36,9 +35,6 @@ import io.javaoperatorsdk.operator.api.reconciler.Context; import java.util.List; import java.util.Optional; -import static org.apache.flink.api.common.JobStatus.FINISHED; -import static org.apache.flink.api.common.JobStatus.RUNNING; - /** The observer of {@link org.apache.flink.kubernetes.operator.config.Mode#APPLICATION} cluster. */ public class ApplicationObserver extends AbstractDeploymentObserver { @@ -72,7 +68,7 @@ public class ApplicationObserver extends AbstractDeploymentObserver { } @Override - protected boolean observeFlinkCluster( + protected void observeFlinkCluster( FlinkDeployment flinkApp, Context context, Configuration deployedConfig) { boolean jobFound = @@ -83,31 +79,5 @@ public class ApplicationObserver extends AbstractDeploymentObserver { if (jobFound) { savepointObserver.observeSavepointStatus(flinkApp, deployedConfig); } - return isJobStable(flinkApp.getStatus()); - } - - private boolean isJobStable(FlinkDeploymentStatus deploymentStatus) { - var flinkJobStatus = - org.apache.flink.api.common.JobStatus.valueOf( - deploymentStatus.getJobStatus().getState()); - - if (flinkJobStatus == RUNNING) { - // Running jobs are currently always marked stable - return true; - } - - var reconciledJobState = - deploymentStatus - .getReconciliationStatus() - .deserializeLastReconciledSpec() - .getJob() - .getState(); - - if (reconciledJobState == JobState.RUNNING && flinkJobStatus == FINISHED) { - // If the job finished on its own, it's marked stable - return true; - } - - return false; } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java index fc6f4ed..3debfd9 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java @@ -20,6 +20,7 @@ package org.apache.flink.kubernetes.operator.observer.deployment; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; +import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState; import org.apache.flink.kubernetes.operator.service.FlinkService; import org.apache.flink.kubernetes.operator.utils.EventRecorder; @@ -38,19 +39,21 @@ public class SessionObserver extends AbstractDeploymentObserver { } @Override - public boolean observeFlinkCluster( - FlinkDeployment flinkApp, Context context, Configuration deployedConfig) { + public void observeFlinkCluster( + FlinkDeployment deployment, Context context, Configuration deployedConfig) { // Check if session cluster can serve rest calls following our practice in JobObserver try { flinkService.listJobs(deployedConfig); - return true; + var rs = deployment.getStatus().getReconciliationStatus(); + if (rs.getState() == ReconciliationState.DEPLOYED) { + rs.markReconciledSpecAsStable(); + } } catch (Exception e) { logger.error("REST service in session cluster is bad now", e); if (e instanceof TimeoutException) { // check for problems with the underlying deployment - observeJmDeployment(flinkApp, context, deployedConfig); + observeJmDeployment(deployment, context, deployedConfig); } - return false; } } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java index ddbc79f..d35cd2a 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java @@ -52,6 +52,9 @@ import javax.annotation.Nullable; import java.util.Optional; +import static org.apache.flink.api.common.JobStatus.FINISHED; +import static org.apache.flink.api.common.JobStatus.RUNNING; + /** Reconciliation utilities. */ public class ReconciliationUtils { @@ -364,4 +367,30 @@ public class ReconciliationUtils { return lastSpecWithMeta.f1.get("metadata").get("generation").asLong(-1L); } + + public static void checkAndUpdateStableSpec(CommonStatus<?> status) { + var flinkJobStatus = + org.apache.flink.api.common.JobStatus.valueOf(status.getJobStatus().getState()); + + if (status.getReconciliationStatus().getState() != ReconciliationState.DEPLOYED) { + return; + } + + if (flinkJobStatus == RUNNING) { + // Running jobs are currently always marked stable + status.getReconciliationStatus().markReconciledSpecAsStable(); + return; + } + + var reconciledJobState = + status.getReconciliationStatus() + .deserializeLastReconciledSpec() + .getJob() + .getState(); + + if (reconciledJobState == JobState.RUNNING && flinkJobStatus == FINISHED) { + // If the job finished on its own, it's marked stable + status.getReconciliationStatus().markReconciledSpecAsStable(); + } + } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java index a73e000..6e3a0f6 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java @@ -80,10 +80,16 @@ public class SessionJobObserverTest { Assertions.assertEquals( JobStatus.RECONCILING.name(), sessionJob.getStatus().getJobStatus().getState()); + var reconStatus = sessionJob.getStatus().getReconciliationStatus(); + Assertions.assertNotEquals( + reconStatus.getLastReconciledSpec(), reconStatus.getLastStableSpec()); + // observe with ready context observer.observe(sessionJob, readyContext); Assertions.assertEquals( JobStatus.RUNNING.name(), sessionJob.getStatus().getJobStatus().getState()); + Assertions.assertEquals( + reconStatus.getLastReconciledSpec(), reconStatus.getLastStableSpec()); flinkService.setPortReady(false); observer.observe(sessionJob, readyContext);