This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new d75e45e [FLINK-28389] Correct spec and status updates in FlinkDeploymentControllerTest d75e45e is described below commit d75e45e45600f3c550b671d4883fb5ed9dcd96dc Author: Matyas Orhidi <matyas_orh...@apple.com> AuthorDate: Tue Jul 5 11:29:16 2022 +0200 [FLINK-28389] Correct spec and status updates in FlinkDeploymentControllerTest --- .../flink/kubernetes/operator/TestUtils.java | 31 ----- .../controller/DeploymentRecoveryTest.java | 9 +- .../controller/FlinkDeploymentControllerTest.java | 26 ++-- .../operator/controller/RollbackTest.java | 8 +- .../TestingFlinkDeploymentController.java | 147 +++++++++++++++++++++ 5 files changed, 163 insertions(+), 58 deletions(-) diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java index cdbaa8e..f8b6290 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java @@ -21,8 +21,6 @@ import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory; -import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; -import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob; import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec; @@ -38,13 +36,6 @@ import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus; import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus; import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException; -import org.apache.flink.kubernetes.operator.metrics.MetricManager; -import org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory; -import org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory; -import org.apache.flink.kubernetes.operator.utils.EventRecorder; -import org.apache.flink.kubernetes.operator.utils.StatusRecorder; -import org.apache.flink.kubernetes.operator.utils.ValidatorUtils; -import org.apache.flink.metrics.testutils.MetricListener; import io.fabric8.kubernetes.api.model.Container; import io.fabric8.kubernetes.api.model.ContainerStatus; @@ -60,7 +51,6 @@ import io.fabric8.kubernetes.api.model.apps.Deployment; import io.fabric8.kubernetes.api.model.apps.DeploymentCondition; import io.fabric8.kubernetes.api.model.apps.DeploymentSpec; import io.fabric8.kubernetes.api.model.apps.DeploymentStatus; -import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.mockwebserver.utils.ResponseProvider; import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import io.javaoperatorsdk.operator.api.reconciler.Context; @@ -431,27 +421,6 @@ public class TestUtils { } } - public static FlinkDeploymentController createTestController( - FlinkConfigManager configManager, - KubernetesClient kubernetesClient, - TestingFlinkService flinkService, - StatusRecorder statusRecorder) { - var eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {}); - return new FlinkDeploymentController( - configManager, - ValidatorUtils.discoverValidators(configManager), - new ReconcilerFactory( - kubernetesClient, - flinkService, - configManager, - eventRecorder, - statusRecorder), - new ObserverFactory(flinkService, configManager, statusRecorder, eventRecorder), - new MetricManager<>(new MetricListener().getMetricGroup()), - statusRecorder, - eventRecorder); - } - /** Testing ResponseProvider. */ public static class ValidatingResponseProvider<T> implements ResponseProvider<Object> { diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java index 1425dcd..7ecd28f 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java @@ -26,7 +26,6 @@ import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion; import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus; -import org.apache.flink.kubernetes.operator.utils.StatusRecorder; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; @@ -52,7 +51,7 @@ public class DeploymentRecoveryTest { private TestingFlinkService flinkService; private Context context; - private FlinkDeploymentController testController; + private TestingFlinkDeploymentController testController; private KubernetesClient kubernetesClient; @@ -61,11 +60,7 @@ public class DeploymentRecoveryTest { flinkService = new TestingFlinkService(kubernetesClient); context = flinkService.getContext(); testController = - TestUtils.createTestController( - configManager, - kubernetesClient, - flinkService, - new StatusRecorder<>(kubernetesClient, (a, c) -> {})); + new TestingFlinkDeploymentController(configManager, kubernetesClient, flinkService); kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace(); } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java index 788b770..54e0ace 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java @@ -40,7 +40,6 @@ import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler; import org.apache.flink.kubernetes.operator.utils.IngressUtils; -import org.apache.flink.kubernetes.operator.utils.StatusRecorder; import org.apache.flink.runtime.client.JobStatusMessage; import io.fabric8.kubernetes.api.model.EventBuilder; @@ -75,7 +74,7 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.params.provider.Arguments.arguments; -/** @link FlinkDeploymentController tests */ +/** {@link FlinkDeploymentController} tests. */ @EnableKubernetesMockClient(crud = true) public class FlinkDeploymentControllerTest { @@ -83,22 +82,18 @@ public class FlinkDeploymentControllerTest { private TestingFlinkService flinkService; private Context context; - private FlinkDeploymentController testController; + private TestingFlinkDeploymentController testController; private KubernetesMockServer mockServer; private KubernetesClient kubernetesClient; - private StatusUpdateCounter statusUpdateCounter = new StatusUpdateCounter(); @BeforeEach public void setup() { flinkService = new TestingFlinkService(kubernetesClient); context = flinkService.getContext(); + testController = - TestUtils.createTestController( - configManager, - kubernetesClient, - flinkService, - new StatusRecorder<>(kubernetesClient, statusUpdateCounter)); + new TestingFlinkDeploymentController(configManager, kubernetesClient, flinkService); kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace(); } @@ -121,7 +116,7 @@ public class FlinkDeploymentControllerTest { assertEquals( org.apache.flink.api.common.JobStatus.RECONCILING.name(), appCluster.getStatus().getJobStatus().getState()); - assertEquals(2, statusUpdateCounter.getCount()); + assertEquals(2, testController.getInternalStatusUpdateCount()); assertFalse(updateControl.isUpdateStatus()); assertEquals( Optional.of( @@ -145,7 +140,7 @@ public class FlinkDeploymentControllerTest { assertEquals( org.apache.flink.api.common.JobStatus.RECONCILING.name(), appCluster.getStatus().getJobStatus().getState()); - assertEquals(3, statusUpdateCounter.getCount()); + assertEquals(3, testController.getInternalStatusUpdateCount()); assertFalse(updateControl.isUpdateStatus()); assertEquals( Optional.of( @@ -159,7 +154,7 @@ public class FlinkDeploymentControllerTest { assertEquals( org.apache.flink.api.common.JobStatus.RUNNING.name(), appCluster.getStatus().getJobStatus().getState()); - assertEquals(4, statusUpdateCounter.getCount()); + assertEquals(4, testController.getInternalStatusUpdateCount()); assertFalse(updateControl.isUpdateStatus()); assertEquals( Optional.of( @@ -174,7 +169,7 @@ public class FlinkDeploymentControllerTest { assertEquals( org.apache.flink.api.common.JobStatus.RUNNING.name(), appCluster.getStatus().getJobStatus().getState()); - assertEquals(4, statusUpdateCounter.getCount()); + assertEquals(4, testController.getInternalStatusUpdateCount()); assertFalse(updateControl.isUpdateStatus()); assertEquals( Optional.of( @@ -200,7 +195,7 @@ public class FlinkDeploymentControllerTest { assertEquals( org.apache.flink.api.common.JobStatus.RUNNING.name(), appCluster.getStatus().getJobStatus().getState()); - assertEquals(5, statusUpdateCounter.getCount()); + assertEquals(5, testController.getInternalStatusUpdateCount()); assertFalse(updateControl.isUpdateStatus()); reconciliationStatus = appCluster.getStatus().getReconciliationStatus(); @@ -408,7 +403,7 @@ public class FlinkDeploymentControllerTest { .getSavepointInfo() .getSavepointHistory() .isEmpty()); - assertEquals(0, testController.reconcile(appCluster, context).getScheduleDelay().get()); + assertEquals(0L, testController.reconcile(appCluster, context).getScheduleDelay().get()); assertEquals( JobState.SUSPENDED, appCluster @@ -564,6 +559,7 @@ public class FlinkDeploymentControllerTest { var appCluster = TestUtils.buildApplicationCluster(flinkVersion); appCluster.getSpec().getJob().setUpgradeMode(upgradeMode); testUpgradeNotReadyCluster(ReconciliationUtils.clone(appCluster)); + assertEquals(upgradeMode, appCluster.getSpec().getJob().getUpgradeMode()); } @Test diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java index b591c7e..92ead48 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java @@ -31,7 +31,6 @@ import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatu import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState; import org.apache.flink.kubernetes.operator.crd.status.Savepoint; import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType; -import org.apache.flink.kubernetes.operator.utils.StatusRecorder; import org.apache.flink.util.function.ThrowingRunnable; import io.fabric8.kubernetes.client.KubernetesClient; @@ -59,7 +58,7 @@ public class RollbackTest { private TestingFlinkService flinkService; private Context context; - private FlinkDeploymentController testController; + private TestingFlinkDeploymentController testController; private KubernetesClient kubernetesClient; @@ -68,11 +67,10 @@ public class RollbackTest { flinkService = new TestingFlinkService(kubernetesClient); context = flinkService.getContext(); testController = - TestUtils.createTestController( + new TestingFlinkDeploymentController( new FlinkConfigManager(new Configuration()), kubernetesClient, - flinkService, - new StatusRecorder<>(kubernetesClient, (a, c) -> {})); + flinkService); kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace(); } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java new file mode 100644 index 0000000..d3909b4 --- /dev/null +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.controller; + +import org.apache.flink.kubernetes.operator.TestingFlinkService; +import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; +import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource; +import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; +import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; +import org.apache.flink.kubernetes.operator.metrics.MetricManager; +import org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory; +import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; +import org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory; +import org.apache.flink.kubernetes.operator.utils.EventRecorder; +import org.apache.flink.kubernetes.operator.utils.StatusRecorder; +import org.apache.flink.kubernetes.operator.utils.ValidatorUtils; +import org.apache.flink.metrics.testutils.MetricListener; + +import io.fabric8.kubernetes.client.KubernetesClient; +import io.javaoperatorsdk.operator.api.reconciler.Cleaner; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; +import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler; +import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer; +import io.javaoperatorsdk.operator.api.reconciler.Reconciler; +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; +import io.javaoperatorsdk.operator.processing.event.source.EventSource; +import org.junit.jupiter.api.Assertions; + +import java.util.Map; +import java.util.function.BiConsumer; + +/** A wrapper around {@link FlinkDeploymentController} used by unit tests. */ +public class TestingFlinkDeploymentController + implements Reconciler<FlinkDeployment>, + ErrorStatusHandler<FlinkDeployment>, + EventSourceInitializer<FlinkDeployment>, + Cleaner<FlinkDeployment> { + + private FlinkDeploymentController flinkDeploymentController; + private StatusUpdateCounter statusUpdateCounter = new StatusUpdateCounter(); + private EventRecorder eventRecorder; + private StatusRecorder statusRecorder; + + public TestingFlinkDeploymentController( + FlinkConfigManager configManager, + KubernetesClient kubernetesClient, + TestingFlinkService flinkService) { + eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {}); + statusRecorder = new StatusRecorder<>(kubernetesClient, statusUpdateCounter); + + flinkDeploymentController = + new FlinkDeploymentController( + configManager, + ValidatorUtils.discoverValidators(configManager), + new ReconcilerFactory( + kubernetesClient, + flinkService, + configManager, + eventRecorder, + statusRecorder), + new ObserverFactory( + flinkService, configManager, statusRecorder, eventRecorder), + new MetricManager<>(new MetricListener().getMetricGroup()), + statusRecorder, + eventRecorder); + } + + @Override + public UpdateControl<FlinkDeployment> reconcile( + FlinkDeployment flinkDeployment, Context<FlinkDeployment> context) throws Exception { + FlinkDeployment cloned = ReconciliationUtils.clone(flinkDeployment); + statusUpdateCounter.setCurrent(flinkDeployment); + UpdateControl<FlinkDeployment> updateControl = + flinkDeploymentController.reconcile(cloned, context); + Assertions.assertTrue(updateControl.isNoUpdate()); + return updateControl; + } + + @Override + public DeleteControl cleanup( + FlinkDeployment flinkDeployment, Context<FlinkDeployment> context) { + FlinkDeployment cloned = ReconciliationUtils.clone(flinkDeployment); + statusUpdateCounter.setCurrent(flinkDeployment); + return flinkDeploymentController.cleanup(cloned, context); + } + + @Override + public ErrorStatusUpdateControl<FlinkDeployment> updateErrorStatus( + FlinkDeployment flinkDeployment, Context<FlinkDeployment> context, Exception e) { + FlinkDeployment cloned = ReconciliationUtils.clone(flinkDeployment); + statusUpdateCounter.setCurrent(flinkDeployment); + return flinkDeploymentController.updateErrorStatus(cloned, context, e); + } + + @Override + public Map<String, EventSource> prepareEventSources( + EventSourceContext<FlinkDeployment> eventSourceContext) { + throw new UnsupportedOperationException(); + } + + private static class StatusUpdateCounter + implements BiConsumer< + AbstractFlinkResource<?, FlinkDeploymentStatus>, FlinkDeploymentStatus> { + + private FlinkDeployment currentResource; + private int counter; + + @Override + public void accept( + AbstractFlinkResource<?, FlinkDeploymentStatus> + flinkDeploymentStatusAbstractFlinkResource, + FlinkDeploymentStatus flinkDeploymentStatus) { + currentResource.setStatus(flinkDeploymentStatusAbstractFlinkResource.getStatus()); + counter++; + } + + public void setCurrent(FlinkDeployment currentResource) { + this.currentResource = currentResource; + } + + public int getCount() { + return counter; + } + } + + public int getInternalStatusUpdateCount() { + return statusUpdateCounter.getCount(); + } +}