This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch release-1.1 in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/release-1.1 by this push: new d0c9f6ba [FLINK-28845] Do not ignore initialSavepointPath if first deploy fails completely d0c9f6ba is described below commit d0c9f6ba714d5265ec55154213f3b2c383831cdc Author: Gyula Fora <g_f...@apple.com> AuthorDate: Tue Aug 9 08:51:14 2022 +0200 [FLINK-28845] Do not ignore initialSavepointPath if first deploy fails completely --- .../operator/crd/status/ReconciliationStatus.java | 6 +-- .../deployment/AbstractDeploymentObserver.java | 7 ++-- .../observer/sessionjob/SessionJobObserver.java | 1 + .../reconciler/ReconciliationMetadata.java | 49 ++++++++++++++++++++++ .../operator/reconciler/ReconciliationUtils.java | 47 ++++++++++++++------- .../controller/FlinkDeploymentControllerTest.java | 16 +++++++ .../deployment/ApplicationObserverTest.java | 17 +++++++- .../observer/deployment/SessionObserverTest.java | 2 +- .../sessionjob/SessionJobObserverTest.java | 16 +++++++ 9 files changed, 137 insertions(+), 24 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/ReconciliationStatus.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/ReconciliationStatus.java index b867887f..380971b7 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/ReconciliationStatus.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/ReconciliationStatus.java @@ -21,10 +21,10 @@ import org.apache.flink.annotation.Experimental; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource; import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec; +import org.apache.flink.kubernetes.operator.reconciler.ReconciliationMetadata; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.Data; import lombok.NoArgsConstructor; @@ -68,12 +68,12 @@ public abstract class ReconciliationStatus<SPEC extends AbstractFlinkSpec> { } @JsonIgnore - public Tuple2<SPEC, ObjectNode> deserializeLastReconciledSpecWithMeta() { + public Tuple2<SPEC, ReconciliationMetadata> deserializeLastReconciledSpecWithMeta() { return ReconciliationUtils.deserializeSpecWithMeta(lastReconciledSpec, getSpecClass()); } @JsonIgnore - public Tuple2<SPEC, ObjectNode> deserializeLastStableSpecWithMeta() { + public Tuple2<SPEC, ReconciliationMetadata> deserializeLastStableSpecWithMeta() { return ReconciliationUtils.deserializeSpecWithMeta(lastStableSpec, getSpecClass()); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java index e3b5ebd6..eb046505 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java @@ -77,13 +77,15 @@ public abstract class AbstractDeploymentObserver implements Observer<FlinkDeploy var reconciliationStatus = status.getReconciliationStatus(); // Nothing has been launched so skip observing - if (reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) { + if (reconciliationStatus.isFirstDeployment() + || reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) { return; } if (reconciliationStatus.getState() == ReconciliationState.UPGRADING) { checkIfAlreadyUpgraded(flinkApp, context); if (reconciliationStatus.getState() == ReconciliationState.UPGRADING) { + ReconciliationUtils.clearLastReconciledSpecIfFirstDeploy(flinkApp); return; } } @@ -263,9 +265,6 @@ public abstract class AbstractDeploymentObserver implements Observer<FlinkDeploy */ private void checkIfAlreadyUpgraded(FlinkDeployment flinkDep, Context context) { var status = flinkDep.getStatus(); - if (status.getReconciliationStatus().isFirstDeployment()) { - return; - } Optional<Deployment> depOpt = context.getSecondaryResource(Deployment.class); depOpt.ifPresent( deployment -> { diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java index 5ba397c4..e3c57c4f 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java @@ -118,6 +118,7 @@ public class SessionJobObserver implements Observer<FlinkSessionJob> { if (reconciliationStatus.getState() == ReconciliationState.UPGRADING) { checkIfAlreadyUpgraded(flinkSessionJob, deployedConfig); if (reconciliationStatus.getState() == ReconciliationState.UPGRADING) { + ReconciliationUtils.clearLastReconciledSpecIfFirstDeploy(flinkSessionJob); return; } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationMetadata.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationMetadata.java new file mode 100644 index 00000000..a20f56f5 --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationMetadata.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.reconciler; + +import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource; + +import com.fasterxml.jackson.annotation.JsonInclude; +import io.fabric8.kubernetes.api.model.ObjectMeta; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** Extra metadata to be attached to the reconciled spec. */ +@Data +@NoArgsConstructor +@AllArgsConstructor +@JsonInclude(JsonInclude.Include.NON_NULL) +public class ReconciliationMetadata { + + private String apiVersion; + + private ObjectMeta metadata; + + private boolean firstDeployment; + + public static ReconciliationMetadata from(AbstractFlinkResource<?, ?> resource) { + ObjectMeta metadata = new ObjectMeta(); + metadata.setGeneration(resource.getMetadata().getGeneration()); + + var firstDeploy = resource.getStatus().getReconciliationStatus().isFirstDeployment(); + + return new ReconciliationMetadata(resource.getApiVersion(), metadata, firstDeploy); + } +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java index 79ee2dad..12dc633b 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java @@ -268,8 +268,9 @@ public class ReconciliationUtils { * @return Tuple2 of spec and meta. * @param <T> Spec type. */ - public static <T extends AbstractFlinkSpec> Tuple2<T, ObjectNode> deserializeSpecWithMeta( - @Nullable String specWithMetaString, Class<T> specClass) { + public static <T extends AbstractFlinkSpec> + Tuple2<T, ReconciliationMetadata> deserializeSpecWithMeta( + @Nullable String specWithMetaString, Class<T> specClass) { if (specWithMetaString == null) { return null; } @@ -284,7 +285,8 @@ public class ReconciliationUtils { return Tuple2.of(objectMapper.treeToValue(wrapper, specClass), null); } else { return Tuple2.of( - objectMapper.treeToValue(wrapper.get("spec"), specClass), internalMeta); + objectMapper.treeToValue(wrapper.get("spec"), specClass), + objectMapper.convertValue(internalMeta, ReconciliationMetadata.class)); } } catch (JsonProcessingException e) { throw new RuntimeException("Could not deserialize spec, this indicates a bug...", e); @@ -300,29 +302,25 @@ public class ReconciliationUtils { */ public static String writeSpecWithMeta( AbstractFlinkSpec spec, AbstractFlinkResource<?, ?> relatedResource) { - - ObjectNode internalMeta = objectMapper.createObjectNode(); - - internalMeta.put("apiVersion", relatedResource.getApiVersion()); - ObjectNode metadata = internalMeta.putObject("metadata"); - metadata.put("generation", relatedResource.getMetadata().getGeneration()); - - return writeSpecWithMeta(spec, internalMeta); + return writeSpecWithMeta(spec, ReconciliationMetadata.from(relatedResource)); } /** * Serializes the spec and custom meta information into a JSON string. * * @param spec Flink resource spec. - * @param meta Custom meta object. + * @param metadata Reconciliation meta object. * @return Serialized json. */ - public static String writeSpecWithMeta(AbstractFlinkSpec spec, ObjectNode meta) { + public static String writeSpecWithMeta( + AbstractFlinkSpec spec, ReconciliationMetadata metadata) { ObjectNode wrapper = objectMapper.createObjectNode(); wrapper.set("spec", objectMapper.valueToTree(Preconditions.checkNotNull(spec))); - wrapper.set(INTERNAL_METADATA_JSON_KEY, meta); + wrapper.set( + INTERNAL_METADATA_JSON_KEY, + objectMapper.valueToTree(Preconditions.checkNotNull(metadata))); try { return objectMapper.writeValueAsString(wrapper); @@ -435,7 +433,26 @@ public class ReconciliationUtils { return -1L; } - return lastSpecWithMeta.f1.get("metadata").get("generation").asLong(-1L); + return lastSpecWithMeta.f1.getMetadata().getGeneration(); + } + + /** + * Clear last reconciled spec if that corresponds to the first deployment. This is important in + * cases where the first deployment fails. + * + * @param resource Flink resource. + */ + public static void clearLastReconciledSpecIfFirstDeploy(AbstractFlinkResource<?, ?> resource) { + var reconStatus = resource.getStatus().getReconciliationStatus(); + var lastSpecWithMeta = reconStatus.deserializeLastReconciledSpecWithMeta(); + + if (lastSpecWithMeta.f1 == null) { + return; + } + + if (lastSpecWithMeta.f1.isFirstDeployment()) { + reconStatus.setLastReconciledSpec(null); + } } /** diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java index 866ecbd5..d35efb7d 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java @@ -70,6 +70,7 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.params.provider.Arguments.arguments; /** {@link FlinkDeploymentController} tests. */ @@ -942,4 +943,19 @@ public class FlinkDeploymentControllerTest { } return args.stream(); } + + @Test + public void testInitialSavepointOnError() throws Exception { + FlinkDeployment flinkDeployment = TestUtils.buildApplicationCluster(); + flinkDeployment.getSpec().getJob().setInitialSavepointPath("msp"); + flinkService.setDeployFailure(true); + try { + testController.reconcile(flinkDeployment, context); + fail(); + } catch (Exception expected) { + } + flinkService.setDeployFailure(false); + testController.reconcile(flinkDeployment, context); + assertEquals("msp", flinkService.listJobs().get(0).f0); + } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java index f3f93de6..b5d849bb 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java @@ -490,8 +490,23 @@ public class ApplicationObserverTest { status.getJobManagerDeploymentStatus()); var specWithMeta = status.getReconciliationStatus().deserializeLastReconciledSpecWithMeta(); - assertEquals(321L, specWithMeta.f1.get("metadata").get("generation").asLong()); + assertEquals(321L, specWithMeta.f1.getMetadata().getGeneration()); assertEquals(JobState.RUNNING, specWithMeta.f0.getJob().getState()); assertEquals(5, specWithMeta.f0.getJob().getParallelism()); } + + @Test + public void validateLastReconciledClearedOnInitialFailure() { + FlinkDeployment deployment = TestUtils.buildApplicationCluster(); + deployment.getMetadata().setGeneration(123L); + + ReconciliationUtils.updateStatusBeforeDeploymentAttempt( + deployment, + new FlinkConfigManager(new Configuration()) + .getDeployConfig(deployment.getMetadata(), deployment.getSpec())); + + assertFalse(deployment.getStatus().getReconciliationStatus().isFirstDeployment()); + observer.observe(deployment, TestUtils.createEmptyContext()); + assertTrue(deployment.getStatus().getReconciliationStatus().isFirstDeployment()); + } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java index a9e06049..2a048ef3 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java @@ -148,7 +148,7 @@ public class SessionObserverTest { status.getJobManagerDeploymentStatus()); var specWithMeta = status.getReconciliationStatus().deserializeLastReconciledSpecWithMeta(); - assertEquals(321L, specWithMeta.f1.get("metadata").get("generation").asLong()); + assertEquals(321L, specWithMeta.f1.getMetadata().getGeneration()); assertEquals("1", specWithMeta.f0.getFlinkConfiguration().get("k")); } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java index e7270bd7..f7423c84 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java @@ -29,6 +29,7 @@ import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus; import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState; import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType; +import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.kubernetes.operator.utils.FlinkUtils; @@ -43,6 +44,9 @@ import org.junit.jupiter.api.Test; import java.util.Map; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + /** Tests for {@link SessionJobObserver}. */ @EnableKubernetesMockClient(crud = true) public class SessionJobObserverTest { @@ -341,4 +345,16 @@ public class SessionJobObserverTest { Assertions.assertTrue( exception.getMessage().contains("doesn't match upgrade target generation")); } + + @Test + public void validateLastReconciledClearedOnInitialFailure() { + var sessionJob = TestUtils.buildSessionJob(); + sessionJob.getMetadata().setGeneration(123L); + + ReconciliationUtils.updateStatusBeforeDeploymentAttempt(sessionJob, new Configuration()); + + assertFalse(sessionJob.getStatus().getReconciliationStatus().isFirstDeployment()); + observer.observe(sessionJob, TestUtils.createContextWithReadyFlinkDeployment()); + assertTrue(sessionJob.getStatus().getReconciliationStatus().isFirstDeployment()); + } }