This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 7db0eb1e [FLINK-30004] Cleanup deployment after savepoint for Flink versions < 1.15 7db0eb1e is described below commit 7db0eb1e02a1430128f436bdecea9748ce81fe14 Author: Thomas Weise <t...@apache.org> AuthorDate: Sun Nov 13 15:56:41 2022 -0500 [FLINK-30004] Cleanup deployment after savepoint for Flink versions < 1.15 --- .../operator/service/AbstractFlinkService.java | 1 + .../operator/service/NativeFlinkService.java | 7 ++++++- .../operator/service/NativeFlinkServiceTest.java | 19 +++++++++++++++++-- 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java index 38b32d04..2208cf80 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java @@ -326,6 +326,7 @@ public abstract class AbstractFlinkService implements FlinkService { exception); } if (deleteClusterAfterSavepoint) { + LOG.info("Cleaning up deployment after stop-with-savepoint"); deleteClusterDeployment(deployment.getMetadata(), deploymentStatus, true); } break; diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java index f40322ff..1402c8d3 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java @@ -27,6 +27,7 @@ import org.apache.flink.client.deployment.application.ApplicationConfiguration; import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; +import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion; import org.apache.flink.kubernetes.operator.api.spec.JobSpec; import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus; @@ -80,7 +81,11 @@ public class NativeFlinkService extends AbstractFlinkService { public void cancelJob( FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration configuration) throws Exception { - cancelJob(deployment, upgradeMode, configuration, false); + // prior to Flink 1.15, ensure removal of orphaned config maps + // https://issues.apache.org/jira/browse/FLINK-30004 + boolean deleteClusterAfterSavepoint = + !deployment.getSpec().getFlinkVersion().isNewerVersionThan(FlinkVersion.v1_14); + cancelJob(deployment, upgradeMode, configuration, deleteClusterAfterSavepoint); } @Override diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java index c79663ef..fb204c1a 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java @@ -56,6 +56,8 @@ import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import java.util.ArrayList; import java.util.Arrays; @@ -114,8 +116,9 @@ public class NativeFlinkServiceTest { assertNull(jobStatus.getSavepointInfo().getLastSavepoint()); } - @Test - public void testCancelJobWithSavepointUpgradeMode() throws Exception { + @ParameterizedTest + @EnumSource(FlinkVersion.class) + public void testCancelJobWithSavepointUpgradeMode(FlinkVersion flinkVersion) throws Exception { final TestingClusterClient<String> testingClusterClient = new TestingClusterClient<>(configuration, TestUtils.TEST_DEPLOYMENT_NAME); final CompletableFuture<Tuple3<JobID, Boolean, String>> stopWithSavepointFuture = @@ -143,6 +146,7 @@ public class NativeFlinkServiceTest { jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name()); ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration()); + deployment.getSpec().setFlinkVersion(flinkVersion); flinkService.cancelJob( deployment, UpgradeMode.SAVEPOINT, configManager.getObserveConfig(deployment)); assertTrue(stopWithSavepointFuture.isDone()); @@ -150,6 +154,17 @@ public class NativeFlinkServiceTest { assertFalse(stopWithSavepointFuture.get().f1); assertEquals(savepointPath, stopWithSavepointFuture.get().f2); assertEquals(savepointPath, jobStatus.getSavepointInfo().getLastSavepoint().getLocation()); + + assertEquals(jobStatus.getState(), org.apache.flink.api.common.JobStatus.FINISHED.name()); + if (flinkVersion.isNewerVersionThan(FlinkVersion.v1_14)) { + assertEquals( + deployment.getStatus().getJobManagerDeploymentStatus(), + JobManagerDeploymentStatus.READY); + } else { + assertEquals( + deployment.getStatus().getJobManagerDeploymentStatus(), + JobManagerDeploymentStatus.MISSING); + } } @Test