This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 7db0eb1e [FLINK-30004] Cleanup deployment after savepoint for Flink 
versions < 1.15
7db0eb1e is described below

commit 7db0eb1e02a1430128f436bdecea9748ce81fe14
Author: Thomas Weise <t...@apache.org>
AuthorDate: Sun Nov 13 15:56:41 2022 -0500

    [FLINK-30004] Cleanup deployment after savepoint for Flink versions < 1.15
---
 .../operator/service/AbstractFlinkService.java        |  1 +
 .../operator/service/NativeFlinkService.java          |  7 ++++++-
 .../operator/service/NativeFlinkServiceTest.java      | 19 +++++++++++++++++--
 3 files changed, 24 insertions(+), 3 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index 38b32d04..2208cf80 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -326,6 +326,7 @@ public abstract class AbstractFlinkService implements 
FlinkService {
                                 exception);
                     }
                     if (deleteClusterAfterSavepoint) {
+                        LOG.info("Cleaning up deployment after 
stop-with-savepoint");
                         deleteClusterDeployment(deployment.getMetadata(), 
deploymentStatus, true);
                     }
                     break;
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
index f40322ff..1402c8d3 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.client.deployment.application.ApplicationConfiguration;
 import 
org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
@@ -80,7 +81,11 @@ public class NativeFlinkService extends AbstractFlinkService 
{
     public void cancelJob(
             FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration 
configuration)
             throws Exception {
-        cancelJob(deployment, upgradeMode, configuration, false);
+        // prior to Flink 1.15, ensure removal of orphaned config maps
+        // https://issues.apache.org/jira/browse/FLINK-30004
+        boolean deleteClusterAfterSavepoint =
+                
!deployment.getSpec().getFlinkVersion().isNewerVersionThan(FlinkVersion.v1_14);
+        cancelJob(deployment, upgradeMode, configuration, 
deleteClusterAfterSavepoint);
     }
 
     @Override
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
index c79663ef..fb204c1a 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
@@ -56,6 +56,8 @@ import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -114,8 +116,9 @@ public class NativeFlinkServiceTest {
         assertNull(jobStatus.getSavepointInfo().getLastSavepoint());
     }
 
-    @Test
-    public void testCancelJobWithSavepointUpgradeMode() throws Exception {
+    @ParameterizedTest
+    @EnumSource(FlinkVersion.class)
+    public void testCancelJobWithSavepointUpgradeMode(FlinkVersion 
flinkVersion) throws Exception {
         final TestingClusterClient<String> testingClusterClient =
                 new TestingClusterClient<>(configuration, 
TestUtils.TEST_DEPLOYMENT_NAME);
         final CompletableFuture<Tuple3<JobID, Boolean, String>> 
stopWithSavepointFuture =
@@ -143,6 +146,7 @@ public class NativeFlinkServiceTest {
         
jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name());
         ReconciliationUtils.updateStatusForDeployedSpec(deployment, new 
Configuration());
 
+        deployment.getSpec().setFlinkVersion(flinkVersion);
         flinkService.cancelJob(
                 deployment, UpgradeMode.SAVEPOINT, 
configManager.getObserveConfig(deployment));
         assertTrue(stopWithSavepointFuture.isDone());
@@ -150,6 +154,17 @@ public class NativeFlinkServiceTest {
         assertFalse(stopWithSavepointFuture.get().f1);
         assertEquals(savepointPath, stopWithSavepointFuture.get().f2);
         assertEquals(savepointPath, 
jobStatus.getSavepointInfo().getLastSavepoint().getLocation());
+
+        assertEquals(jobStatus.getState(), 
org.apache.flink.api.common.JobStatus.FINISHED.name());
+        if (flinkVersion.isNewerVersionThan(FlinkVersion.v1_14)) {
+            assertEquals(
+                    deployment.getStatus().getJobManagerDeploymentStatus(),
+                    JobManagerDeploymentStatus.READY);
+        } else {
+            assertEquals(
+                    deployment.getStatus().getJobManagerDeploymentStatus(),
+                    JobManagerDeploymentStatus.MISSING);
+        }
     }
 
     @Test

Reply via email to