This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 6d7c8cee [FLINK-29100] Relax upgrade checks to allow stateless restart when no stable spec is present 6d7c8cee is described below commit 6d7c8ceebb9ee0b83eb7036c57003177b7f2fd82 Author: Thomas Weise <t...@apache.org> AuthorDate: Tue Aug 30 21:17:33 2022 -0400 [FLINK-29100] Relax upgrade checks to allow stateless restart when no stable spec is present --- .../deployment/ApplicationReconciler.java | 41 +++++++++++++++++++--- .../deployment/ApplicationReconcilerTest.java | 38 ++++++++++++++++++++ 2 files changed, 75 insertions(+), 4 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java index 42aabaad..43520df3 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java @@ -27,6 +27,7 @@ import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec; import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus; +import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState; import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.service.FlinkService; @@ -98,12 +99,19 @@ public class ApplicationReconciler .OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED) && FlinkUtils.isKubernetesHAActivated(deployConfig) && FlinkUtils.isKubernetesHAActivated(observeConfig) - && flinkService.isHaMetadataAvailable(deployConfig) && !flinkVersionChanged( ReconciliationUtils.getDeployedSpec(deployment), deployment.getSpec())) { - LOG.info( - "Job is not running but HA metadata is available for last state restore, ready for upgrade"); - return Optional.of(UpgradeMode.LAST_STATE); + + if (!flinkService.isHaMetadataAvailable(deployConfig)) { + if (deployment.getStatus().getReconciliationStatus().getLastStableSpec() == null) { + // initial deployment failure, reset to allow for spec change to proceed + return resetOnMissingStableSpec(deployment, deployConfig); + } + } else { + LOG.info( + "Job is not running but HA metadata is available for last state restore, ready for upgrade"); + return Optional.of(UpgradeMode.LAST_STATE); + } } if (status.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.MISSING @@ -120,6 +128,31 @@ public class ApplicationReconciler return Optional.empty(); } + private Optional<UpgradeMode> resetOnMissingStableSpec( + FlinkDeployment deployment, Configuration deployConfig) { + // initial deployment failure, reset to allow for spec change to proceed + flinkService.deleteClusterDeployment( + deployment.getMetadata(), deployment.getStatus(), false); + flinkService.waitForClusterShutdown(deployConfig); + if (!flinkService.isHaMetadataAvailable(deployConfig)) { + LOG.info( + "Job never entered stable state. Clearing previous spec to reset for initial deploy"); + // TODO: lastSpecWithMeta.f1.isFirstDeployment() is false + // ReconciliationUtils.clearLastReconciledSpecIfFirstDeploy(deployment); + deployment.getStatus().getReconciliationStatus().setLastReconciledSpec(null); + // UPGRADING triggers immediate reconciliation + deployment + .getStatus() + .getReconciliationStatus() + .setState(ReconciliationState.UPGRADING); + return Optional.empty(); + } else { + // proceed with upgrade if deployment succeeded between check and delete + LOG.info("Found HA state after deployment deletion, falling back to stateful upgrade"); + return Optional.of(UpgradeMode.LAST_STATE); + } + } + @Override protected void deploy( FlinkDeployment relatedResource, diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java index 73a707b0..e1f04ec5 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java @@ -147,6 +147,11 @@ public class ApplicationReconcilerTest { deployment.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE); deployment.getSpec().setRestartNonce(100L); + deployment + .getStatus() + .getReconciliationStatus() + .setLastStableSpec( + deployment.getStatus().getReconciliationStatus().getLastReconciledSpec()); flinkService.setHaDataAvailable(false); deployment.getStatus().getJobStatus().setState("RECONCILING"); @@ -184,6 +189,39 @@ public class ApplicationReconcilerTest { assertEquals("finished_sp", runningJobs.get(0).f0); } + @ParameterizedTest + @EnumSource(UpgradeMode.class) + public void testUpgradeBeforeReachingStableSpec(UpgradeMode upgradeMode) throws Exception { + flinkService.setHaDataAvailable(false); + + final FlinkDeployment deployment = TestUtils.buildApplicationCluster(); + + reconciler.reconcile(deployment, context); + assertEquals( + JobManagerDeploymentStatus.DEPLOYING, + deployment.getStatus().getJobManagerDeploymentStatus()); + + // Ready for spec changes, the reconciliation should be performed + final String newImage = "new-image-1"; + deployment.getSpec().getJob().setUpgradeMode(upgradeMode); + deployment.getSpec().setImage(newImage); + reconciler.reconcile(deployment, context); + if (!UpgradeMode.STATELESS.equals(upgradeMode)) { + assertNull(deployment.getStatus().getReconciliationStatus().getLastReconciledSpec()); + assertEquals( + ReconciliationState.UPGRADING, + deployment.getStatus().getReconciliationStatus().getState()); + reconciler.reconcile(deployment, context); + } + assertEquals( + newImage, + deployment + .getStatus() + .getReconciliationStatus() + .deserializeLastReconciledSpec() + .getImage()); + } + @Test public void testUpgradeModeChangeFromSavepointToLastState() throws Exception { final String expectedSavepointPath = "savepoint_0";