This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new a2db6c3 [FLINK-28331] Persist status after every observe loop a2db6c3 is described below commit a2db6c31e21d79715324ed45d7138cb8f6d6149e Author: Matyas Orhidi <matyas_orh...@apple.com> AuthorDate: Fri Jul 1 14:24:33 2022 +0200 [FLINK-28331] Persist status after every observe loop --- .../controller/FlinkDeploymentController.java | 1 + .../controller/FlinkSessionJobController.java | 1 + .../operator/observer/SavepointObserver.java | 30 -------- .../flink/kubernetes/operator/TestUtils.java | 6 +- .../controller/DeploymentRecoveryTest.java | 7 +- .../controller/FlinkDeploymentControllerTest.java | 79 +++++++++++++++++++++- .../operator/controller/RollbackTest.java | 4 +- 7 files changed, 90 insertions(+), 38 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java index d36f96b..d75d29c 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java @@ -117,6 +117,7 @@ public class FlinkDeploymentController previousDeployment, false); } + statusRecorder.patchAndCacheStatus(flinkApp); reconcilerFactory.getOrCreate(flinkApp).reconcile(flinkApp, context); } catch (DeploymentFailedException dfe) { handleDeploymentFailed(flinkApp, dfe); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java index 30ff1ea..7f2f6bd 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java @@ -96,6 +96,7 @@ public class FlinkSessionJobController } try { + statusRecorder.patchAndCacheStatus(flinkSessionJob); reconciler.reconcile(flinkSessionJob, context); } catch (Exception e) { throw new ReconciliationException(e); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java index 74f3d35..aa3590a 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java @@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Iterator; import java.util.List; -import java.util.Optional; /** An observer of savepoint progress. */ public class SavepointObserver<STATUS extends CommonStatus<?>> { @@ -69,10 +68,6 @@ public class SavepointObserver<STATUS extends CommonStatus<?>> { var jobStatus = resource.getStatus().getJobStatus(); var savepointInfo = jobStatus.getSavepointInfo(); var jobId = jobStatus.getJobId(); - var previousLastSpPath = - Optional.ofNullable(savepointInfo.getLastSavepoint()) - .map(Savepoint::getLocation) - .orElse(null); // If any manual or periodic savepoint is in progress, observe it if (SavepointUtils.savepointInProgress(jobStatus)) { @@ -83,8 +78,6 @@ public class SavepointObserver<STATUS extends CommonStatus<?>> { if (ReconciliationUtils.isJobInTerminalState(resource.getStatus())) { observeLatestSavepoint(savepointInfo, jobId, deployedConfig); } - - patchStatusOnSavepointChange(resource, savepointInfo, previousLastSpPath); } /** @@ -201,27 +194,4 @@ public class SavepointObserver<STATUS extends CommonStatus<?>> { throw new ReconciliationException(e); } } - - /** - * Patch the Kubernetes Flink resource status if we observed a new last savepoint. This is - * crucial to not lose this information once the reconciler shuts down the cluster. - */ - private void patchStatusOnSavepointChange( - AbstractFlinkResource<?, STATUS> resource, - SavepointInfo savepointInfo, - String previousLastSpPath) { - var currentLastSpPath = - Optional.ofNullable(savepointInfo.getLastSavepoint()) - .map(Savepoint::getLocation) - .orElse(null); - - // If the last savepoint information changes we need to patch the status - // to avoid losing this in case of an operator failure after the cluster was shut down - if (currentLastSpPath != null && !currentLastSpPath.equals(previousLastSpPath)) { - LOG.info( - "Updating resource status after observing new last savepoint {}", - currentLastSpPath); - statusRecorder.patchAndCacheStatus(resource); - } - } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java index 034904d..cdbaa8e 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java @@ -434,10 +434,8 @@ public class TestUtils { public static FlinkDeploymentController createTestController( FlinkConfigManager configManager, KubernetesClient kubernetesClient, - TestingFlinkService flinkService) { - - var statusRecorder = - new StatusRecorder<FlinkDeploymentStatus>(kubernetesClient, (r, s) -> {}); + TestingFlinkService flinkService, + StatusRecorder statusRecorder) { var eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {}); return new FlinkDeploymentController( configManager, diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java index a9d59fe..1425dcd 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java @@ -26,6 +26,7 @@ import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion; import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus; +import org.apache.flink.kubernetes.operator.utils.StatusRecorder; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; @@ -60,7 +61,11 @@ public class DeploymentRecoveryTest { flinkService = new TestingFlinkService(kubernetesClient); context = flinkService.getContext(); testController = - TestUtils.createTestController(configManager, kubernetesClient, flinkService); + TestUtils.createTestController( + configManager, + kubernetesClient, + flinkService, + new StatusRecorder<>(kubernetesClient, (a, c) -> {})); kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace(); } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java index b7c1f1c..c809418 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java @@ -24,12 +24,14 @@ import org.apache.flink.configuration.RestOptions; import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.TestingFlinkService; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; +import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec; import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion; import org.apache.flink.kubernetes.operator.crd.spec.IngressSpec; import org.apache.flink.kubernetes.operator.crd.spec.JobState; import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; +import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus; import org.apache.flink.kubernetes.operator.crd.status.JobStatus; import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus; @@ -38,6 +40,7 @@ import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler; import org.apache.flink.kubernetes.operator.utils.IngressUtils; +import org.apache.flink.kubernetes.operator.utils.StatusRecorder; import org.apache.flink.runtime.client.JobStatusMessage; import io.fabric8.kubernetes.api.model.EventBuilder; @@ -60,6 +63,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.BiConsumer; import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -82,24 +86,41 @@ public class FlinkDeploymentControllerTest { private KubernetesMockServer mockServer; private KubernetesClient kubernetesClient; + private StatusUpdateCounter statusUpdateCounter = new StatusUpdateCounter(); @BeforeEach public void setup() { flinkService = new TestingFlinkService(kubernetesClient); context = flinkService.getContext(); testController = - TestUtils.createTestController(configManager, kubernetesClient, flinkService); + TestUtils.createTestController( + configManager, + kubernetesClient, + flinkService, + new StatusRecorder<>(kubernetesClient, statusUpdateCounter)); kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace(); } @ParameterizedTest @EnumSource(FlinkVersion.class) public void verifyBasicReconcileLoop(FlinkVersion flinkVersion) throws Exception { - FlinkDeployment appCluster = TestUtils.buildApplicationCluster(flinkVersion); UpdateControl<FlinkDeployment> updateControl; + FlinkDeployment appCluster = TestUtils.buildApplicationCluster(FlinkVersion.v1_16); + assertEquals( + JobManagerDeploymentStatus.MISSING, + appCluster.getStatus().getJobManagerDeploymentStatus()); + assertNull(appCluster.getStatus().getJobStatus().getState()); + updateControl = testController.reconcile(appCluster, context); + assertEquals( + JobManagerDeploymentStatus.DEPLOYING, + appCluster.getStatus().getJobManagerDeploymentStatus()); + assertEquals( + org.apache.flink.api.common.JobStatus.RECONCILING.name(), + appCluster.getStatus().getJobStatus().getState()); + assertEquals(2, statusUpdateCounter.getCount()); assertFalse(updateControl.isUpdateStatus()); assertEquals( Optional.of( @@ -117,6 +138,13 @@ public class FlinkDeploymentControllerTest { assertNull(appCluster.getStatus().getReconciliationStatus().getLastStableSpec()); updateControl = testController.reconcile(appCluster, context); + assertEquals( + JobManagerDeploymentStatus.DEPLOYED_NOT_READY, + appCluster.getStatus().getJobManagerDeploymentStatus()); + assertEquals( + org.apache.flink.api.common.JobStatus.RECONCILING.name(), + appCluster.getStatus().getJobStatus().getState()); + assertEquals(3, statusUpdateCounter.getCount()); assertFalse(updateControl.isUpdateStatus()); assertEquals( Optional.of( @@ -124,6 +152,28 @@ public class FlinkDeploymentControllerTest { updateControl.getScheduleDelay()); updateControl = testController.reconcile(appCluster, context); + assertEquals( + JobManagerDeploymentStatus.READY, + appCluster.getStatus().getJobManagerDeploymentStatus()); + assertEquals( + org.apache.flink.api.common.JobStatus.RUNNING.name(), + appCluster.getStatus().getJobStatus().getState()); + assertEquals(4, statusUpdateCounter.getCount()); + assertFalse(updateControl.isUpdateStatus()); + assertEquals( + Optional.of( + configManager.getOperatorConfiguration().getReconcileInterval().toMillis()), + updateControl.getScheduleDelay()); + + // Stable loop + updateControl = testController.reconcile(appCluster, context); + assertEquals( + JobManagerDeploymentStatus.READY, + appCluster.getStatus().getJobManagerDeploymentStatus()); + assertEquals( + org.apache.flink.api.common.JobStatus.RUNNING.name(), + appCluster.getStatus().getJobStatus().getState()); + assertEquals(4, statusUpdateCounter.getCount()); assertFalse(updateControl.isUpdateStatus()); assertEquals( Optional.of( @@ -143,6 +193,13 @@ public class FlinkDeploymentControllerTest { // Send in invalid update appCluster.getSpec().setJob(null); updateControl = testController.reconcile(appCluster, context); + assertEquals( + JobManagerDeploymentStatus.READY, + appCluster.getStatus().getJobManagerDeploymentStatus()); + assertEquals( + org.apache.flink.api.common.JobStatus.RUNNING.name(), + appCluster.getStatus().getJobStatus().getState()); + assertEquals(5, statusUpdateCounter.getCount()); assertFalse(updateControl.isUpdateStatus()); reconciliationStatus = appCluster.getStatus().getReconciliationStatus(); @@ -808,4 +865,22 @@ public class FlinkDeploymentControllerTest { } return args.stream(); } + + private static class StatusUpdateCounter + implements BiConsumer< + AbstractFlinkResource<?, FlinkDeploymentStatus>, FlinkDeploymentStatus> { + private int counter; + + @Override + public void accept( + AbstractFlinkResource<?, FlinkDeploymentStatus> + flinkDeploymentStatusAbstractFlinkResource, + FlinkDeploymentStatus flinkDeploymentStatus) { + counter++; + } + + public int getCount() { + return counter; + } + } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java index 1f53a50..b591c7e 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java @@ -31,6 +31,7 @@ import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatu import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState; import org.apache.flink.kubernetes.operator.crd.status.Savepoint; import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType; +import org.apache.flink.kubernetes.operator.utils.StatusRecorder; import org.apache.flink.util.function.ThrowingRunnable; import io.fabric8.kubernetes.client.KubernetesClient; @@ -70,7 +71,8 @@ public class RollbackTest { TestUtils.createTestController( new FlinkConfigManager(new Configuration()), kubernetesClient, - flinkService); + flinkService, + new StatusRecorder<>(kubernetesClient, (a, c) -> {})); kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace(); }