This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 5fae3534b66fbc428f982389b9ead6ed137f6db2 Author: Gyula Fora <g_f...@apple.com> AuthorDate: Thu Feb 10 15:29:35 2022 +0100 Add job reconciler test --- .../flink/kubernetes/operator/TestUtils.java | 90 +++++++++++++++++++ .../observer/JobStatusObserverTest.java | 54 ++--------- .../operator/reconciler/JobReconcilerTest.java | 100 +++++++++++++++++++++ 3 files changed, 195 insertions(+), 49 deletions(-) diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java new file mode 100644 index 0000000..4ef6860 --- /dev/null +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator; + +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; +import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec; +import org.apache.flink.kubernetes.operator.crd.spec.JobManagerSpec; +import org.apache.flink.kubernetes.operator.crd.spec.JobSpec; +import org.apache.flink.kubernetes.operator.crd.spec.JobState; +import org.apache.flink.kubernetes.operator.crd.spec.Resource; +import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; + +import java.util.Collections; + +/** Testing utilities. */ +public class TestUtils { + + private static final ObjectMapper objectMapper = new ObjectMapper(); + + public static final String TEST_NAMESPACE = "flink-operator-test"; + public static final String SERVICE_ACCOUNT = "flink-operator"; + public static final String FLINK_VERSION = "latest"; + public static final String IMAGE = String.format("flink:%s", FLINK_VERSION); + + public static FlinkDeployment buildSessionCluster() { + FlinkDeployment deployment = new FlinkDeployment(); + deployment.setMetadata( + new ObjectMetaBuilder() + .withName("test-cluster") + .withNamespace(TEST_NAMESPACE) + .build()); + deployment.setSpec( + FlinkDeploymentSpec.builder() + .image(IMAGE) + .flinkVersion(FLINK_VERSION) + .flinkConfiguration( + Collections.singletonMap( + KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT.key(), + SERVICE_ACCOUNT)) + .jobManager(new JobManagerSpec(new Resource(1, "2048m"), 1, null)) + .taskManager(new TaskManagerSpec(new Resource(1, "2048m"), 2, null)) + .build()); + return deployment; + } + + public static FlinkDeployment buildApplicationCluster() { + FlinkDeployment deployment = buildSessionCluster(); + deployment + .getSpec() + .setJob( + JobSpec.builder() + .jarURI("local:///tmp/sample.jar") + .state(JobState.RUNNING) + .build()); + return deployment; + } + + public static <T> T clone(T object) { + if (object == null) { + return null; + } + try { + return (T) + objectMapper.readValue( + objectMapper.writeValueAsString(object), object.getClass()); + } catch (JsonProcessingException e) { + throw new IllegalStateException(e); + } + } +} diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/observer/JobStatusObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java similarity index 61% rename from flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/observer/JobStatusObserverTest.java rename to flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java index 0256e87..37da2f9 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/observer/JobStatusObserverTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java @@ -15,26 +15,19 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.controller.observer; +package org.apache.flink.kubernetes.operator.observer; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.configuration.Configuration; -import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; -import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec; -import org.apache.flink.kubernetes.operator.crd.spec.JobManagerSpec; -import org.apache.flink.kubernetes.operator.crd.spec.JobSpec; import org.apache.flink.kubernetes.operator.crd.spec.JobState; -import org.apache.flink.kubernetes.operator.crd.spec.Resource; -import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec; import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus; -import org.apache.flink.kubernetes.operator.observer.JobStatusObserver; import org.apache.flink.kubernetes.operator.service.FlinkService; import org.apache.flink.kubernetes.operator.utils.FlinkUtils; import org.apache.flink.runtime.client.JobStatusMessage; -import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; import org.junit.jupiter.api.Test; import org.mockito.Mockito; @@ -52,18 +45,14 @@ import static org.mockito.Mockito.when; /** @link JobStatusObserver unit tests */ public class JobStatusObserverTest { - private static final String TEST_NAMESPACE = "flink-operator-test"; - private static final String SERVICE_ACCOUNT = "flink-operator"; - private static final String FLINK_VERSION = "latest"; - private static final String IMAGE = String.format("flink:%s", FLINK_VERSION); - private static final String JOB_NAME = "test1"; + public static final String JOB_NAME = "test1"; private FlinkService flinkService = Mockito.mock(FlinkService.class); @Test public void observeSessionCluster() { JobStatusObserver observer = new JobStatusObserver(flinkService); - FlinkDeployment deployment = buildSessionCluster(); + FlinkDeployment deployment = TestUtils.buildSessionCluster(); deployment.setStatus(new FlinkDeploymentStatus()); deployment.getStatus().setSpec(deployment.getSpec()); assertTrue( @@ -74,7 +63,7 @@ public class JobStatusObserverTest { @Test public void observeApplicationCluster() throws Exception { JobStatusObserver observer = new JobStatusObserver(flinkService); - FlinkDeployment deployment = buildApplicationCluster(); + FlinkDeployment deployment = TestUtils.buildApplicationCluster(); assertTrue( observer.observeFlinkJobStatus( deployment, FlinkUtils.getEffectiveConfig(deployment))); @@ -104,37 +93,4 @@ public class JobStatusObserverTest { deployment, FlinkUtils.getEffectiveConfig(deployment))); verify(flinkService, times(2)).listJobs(any(Configuration.class)); } - - private static FlinkDeployment buildSessionCluster() { - FlinkDeployment deployment = new FlinkDeployment(); - deployment.setMetadata( - new ObjectMetaBuilder() - .withName("test-cluster") - .withNamespace(TEST_NAMESPACE) - .build()); - deployment.setSpec( - FlinkDeploymentSpec.builder() - .image(IMAGE) - .flinkVersion(FLINK_VERSION) - .flinkConfiguration( - Collections.singletonMap( - KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT.key(), - SERVICE_ACCOUNT)) - .jobManager(new JobManagerSpec(new Resource(1, "2048m"), 1, null)) - .taskManager(new TaskManagerSpec(new Resource(1, "2048m"), 2, null)) - .build()); - return deployment; - } - - private FlinkDeployment buildApplicationCluster() { - FlinkDeployment deployment = buildSessionCluster(); - deployment - .getSpec() - .setJob( - JobSpec.builder() - .jarURI("local:///tmp/sample.jar") - .state(JobState.RUNNING) - .build()); - return deployment; - } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java new file mode 100644 index 0000000..77da7b4 --- /dev/null +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.reconciler; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.TestUtils; +import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; +import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode; +import org.apache.flink.kubernetes.operator.crd.status.JobStatus; +import org.apache.flink.kubernetes.operator.service.FlinkService; +import org.apache.flink.kubernetes.operator.utils.FlinkUtils; +import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; + +import io.fabric8.kubernetes.client.KubernetesClient; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; + +/** @link JobStatusObserver unit tests */ +public class JobReconcilerTest { + + public static final String JOB_NAME = "test1"; + public static final String JOB_ID = "fd72014d4c864993a2e5a9287b4a9c5d"; + + private FlinkService flinkService = Mockito.mock(FlinkService.class); + + @Test + public void testUpgrade() throws Exception { + KubernetesClient kubernetesClient = Mockito.mock(KubernetesClient.class); + JobReconciler reconciler = new JobReconciler(kubernetesClient, flinkService); + FlinkDeployment deployment = TestUtils.buildApplicationCluster(); + Configuration config = FlinkUtils.getEffectiveConfig(deployment); + + reconciler.reconcile("test", deployment, config); + Mockito.verify(flinkService, times(1)).submitApplicationCluster(eq(deployment), eq(config)); + Mockito.clearInvocations(flinkService); + deployment.getStatus().setSpec(deployment.getSpec()); + + JobStatus jobStatus = new JobStatus(); + jobStatus.setJobName(JOB_NAME); + jobStatus.setJobId(JOB_ID); + jobStatus.setState("RUNNING"); + + deployment.getStatus().setJobStatus(jobStatus); + + // Test stateless upgrade + FlinkDeployment statelessUpgrade = TestUtils.clone(deployment); + statelessUpgrade.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS); + statelessUpgrade.getSpec().getFlinkConfiguration().put("new", "conf"); + reconciler.reconcile("test", statelessUpgrade, config); + Mockito.verify(flinkService, times(1)) + .cancelJob(eq(JobID.fromHexString(JOB_ID)), eq(UpgradeMode.STATELESS), eq(config)); + + Mockito.verify(flinkService, times(1)) + .submitApplicationCluster(eq(statelessUpgrade), eq(config)); + + Mockito.clearInvocations(flinkService); + + // Test stateful upgrade + FlinkDeployment statefulUpgrade = TestUtils.clone(deployment); + statefulUpgrade.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT); + statefulUpgrade.getSpec().getFlinkConfiguration().put("new", "conf"); + + Mockito.doReturn(Optional.of("sp")).when(flinkService).cancelJob(any(), any(), any()); + + reconciler.reconcile("test", statefulUpgrade, new Configuration(config)); + Mockito.verify(flinkService, times(1)) + .cancelJob(eq(JobID.fromHexString(JOB_ID)), eq(UpgradeMode.SAVEPOINT), eq(config)); + + ArgumentCaptor<Configuration> argument = ArgumentCaptor.forClass(Configuration.class); + Mockito.verify(flinkService, times(1)) + .submitApplicationCluster(eq(statefulUpgrade), argument.capture()); + assertEquals("sp", argument.getValue().get(SavepointConfigOptions.SAVEPOINT_PATH)); + + Mockito.verifyNoMoreInteractions(flinkService); + } +}