This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new a8fd1942 [FLINK-34438] Wait for Deployments to be deleted on cluster 
shutdown
a8fd1942 is described below

commit a8fd19429e93428e8a2498c32def24aa8ebbd4c4
Author: Máté Czagány <4469996+mateczag...@users.noreply.github.com>
AuthorDate: Tue Feb 20 12:17:54 2024 +0100

    [FLINK-34438] Wait for Deployments to be deleted on cluster shutdown
---
 .../kubernetes_operator_config_configuration.html  |  2 +-
 .../shortcodes/generated/system_section.html       |  2 +-
 .../config/KubernetesOperatorConfigOptions.java    |  2 +-
 .../operator/service/AbstractFlinkService.java     | 85 ++++++++++------------
 .../operator/service/NativeFlinkService.java       |  6 ++
 .../operator/service/StandaloneFlinkService.java   |  8 ++
 .../kubernetes/operator/TestingFlinkService.java   |  5 ++
 .../operator/service/AbstractFlinkServiceTest.java | 67 +++++++++++++++++
 .../service/StandaloneFlinkServiceTest.java        | 14 +---
 9 files changed, 132 insertions(+), 59 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
 
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index 00beffd0..d0680627 100644
--- 
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++ 
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -310,7 +310,7 @@
         </tr>
         <tr>
             <td><h5>kubernetes.operator.resource.cleanup.timeout</h5></td>
-            <td style="word-wrap: break-word;">1 min</td>
+            <td style="word-wrap: break-word;">5 min</td>
             <td>Duration</td>
             <td>The timeout for the resource clean up to wait for flink to 
shutdown cluster.</td>
         </tr>
diff --git a/docs/layouts/shortcodes/generated/system_section.html 
b/docs/layouts/shortcodes/generated/system_section.html
index 9a0350e6..aa053c2f 100644
--- a/docs/layouts/shortcodes/generated/system_section.html
+++ b/docs/layouts/shortcodes/generated/system_section.html
@@ -106,7 +106,7 @@
         </tr>
         <tr>
             <td><h5>kubernetes.operator.resource.cleanup.timeout</h5></td>
-            <td style="word-wrap: break-word;">1 min</td>
+            <td style="word-wrap: break-word;">5 min</td>
             <td>Duration</td>
             <td>The timeout for the resource clean up to wait for flink to 
shutdown cluster.</td>
         </tr>
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index 62e3ba3e..1334ba4f 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -127,7 +127,7 @@ public class KubernetesOperatorConfigOptions {
     public static final ConfigOption<Duration> 
OPERATOR_RESOURCE_CLEANUP_TIMEOUT =
             operatorConfig("resource.cleanup.timeout")
                     .durationType()
-                    .defaultValue(Duration.ofSeconds(60))
+                    .defaultValue(Duration.ofMinutes(5))
                     .withDeprecatedKeys(
                             
operatorConfigKey("reconciler.flink.cluster.shutdown.timeout"))
                     .withDescription(
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index 2fafb027..101480da 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -101,8 +101,8 @@ import org.apache.flink.util.Preconditions;
 import io.fabric8.kubernetes.api.model.DeletionPropagation;
 import io.fabric8.kubernetes.api.model.ObjectMeta;
 import io.fabric8.kubernetes.api.model.PodList;
-import io.fabric8.kubernetes.api.model.Service;
 import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientTimeoutException;
 import org.apache.commons.lang3.ObjectUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -125,6 +125,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -898,58 +899,50 @@ public abstract class AbstractFlinkService implements 
FlinkService {
         }
     }
 
-    /** Wait until the FLink cluster has completely shut down. */
-    @VisibleForTesting
-    void waitForClusterShutdown(String namespace, String clusterId, long 
shutdownTimeout) {
-        LOG.info("Waiting for cluster shutdown...");
-
-        boolean jobManagerRunning = true;
-        boolean taskManagerRunning = true;
-        boolean serviceRunning = true;
+    /** Returns a list of Kubernetes Deployment names for given cluster. */
+    protected abstract List<String> getDeploymentNames(String namespace, 
String clusterId);
 
-        for (int i = 0; i < shutdownTimeout; i++) {
-            if (jobManagerRunning) {
-                PodList jmPodList = getJmPodList(namespace, clusterId);
+    /** Wait until the FLink cluster has completely shut down. */
+    protected void waitForClusterShutdown(
+            String namespace, String clusterId, long shutdownTimeout) {
+        long timeoutAt = System.currentTimeMillis() + shutdownTimeout * 1000;
+        LOG.info("Waiting {} seconds for cluster shutdown...", 
shutdownTimeout);
 
-                if (jmPodList == null || jmPodList.getItems().isEmpty()) {
-                    jobManagerRunning = false;
-                }
-            }
-            if (taskManagerRunning) {
-                PodList tmPodList = getTmPodList(namespace, clusterId);
+        for (var deploymentName : getDeploymentNames(namespace, clusterId)) {
+            long deploymentTimeout = timeoutAt - System.currentTimeMillis();
 
-                if (tmPodList.getItems().isEmpty()) {
-                    taskManagerRunning = false;
-                }
+            if (!waitForDeploymentToBeRemoved(namespace, deploymentName, 
deploymentTimeout)) {
+                LOG.error(
+                        "Failed to shut down cluster {} (deployment {}) in {} 
seconds, proceeding...",
+                        clusterId,
+                        deploymentName,
+                        shutdownTimeout);
+                return;
             }
+        }
+    }
 
-            if (serviceRunning) {
-                Service service =
-                        kubernetesClient
-                                .services()
-                                .inNamespace(namespace)
-                                .withName(
-                                        
ExternalServiceDecorator.getExternalServiceName(clusterId))
-                                .get();
-                if (service == null) {
-                    serviceRunning = false;
-                }
-            }
+    /** Wait until Deployment is removed, return false if timed out, otherwise 
return true. */
+    @VisibleForTesting
+    boolean waitForDeploymentToBeRemoved(String namespace, String 
deploymentName, long timeout) {
+        LOG.info(
+                "Waiting for Deployment {} to shut down with {} seconds 
timeout...",
+                deploymentName,
+                timeout / 1000);
 
-            if (!jobManagerRunning && !serviceRunning && !taskManagerRunning) {
-                break;
-            }
-            // log a message waiting to shutdown Flink cluster every 5 seconds.
-            if ((i + 1) % 5 == 0) {
-                LOG.info("Waiting for cluster shutdown... ({}s)", i + 1);
-            }
-            try {
-                Thread.sleep(1000);
-            } catch (InterruptedException e) {
-                throw new RuntimeException(e);
-            }
+        try {
+            kubernetesClient
+                    .apps()
+                    .deployments()
+                    .inNamespace(namespace)
+                    .withName(deploymentName)
+                    .waitUntilCondition(Objects::isNull, timeout, 
TimeUnit.MILLISECONDS);
+
+            LOG.info("Deployment {} successfully shut down", deploymentName);
+        } catch (KubernetesClientTimeoutException e) {
+            return false;
         }
-        LOG.info("Cluster shutdown completed.");
+        return true;
     }
 
     private static List<JobStatusMessage> toJobStatusMessage(
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
index 52bb398e..fe5cde06 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
@@ -62,6 +62,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -132,6 +133,11 @@ public class NativeFlinkService extends 
AbstractFlinkService {
         return new PodList();
     }
 
+    @Override
+    protected List<String> getDeploymentNames(String namespace, String 
clusterId) {
+        return List.of(KubernetesUtils.getDeploymentName(clusterId));
+    }
+
     protected void submitClusterInternal(Configuration conf) throws Exception {
         LOG.info("Deploying session cluster");
         final ClusterClientServiceLoader clusterClientServiceLoader =
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
index c0439d04..0c3e7477 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
@@ -47,6 +47,7 @@ import io.fabric8.kubernetes.client.KubernetesClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -104,6 +105,13 @@ public class StandaloneFlinkService extends 
AbstractFlinkService {
                 .list();
     }
 
+    @Override
+    protected List<String> getDeploymentNames(String namespace, String 
clusterId) {
+        return List.of(
+                
StandaloneKubernetesUtils.getJobManagerDeploymentName(clusterId),
+                
StandaloneKubernetesUtils.getTaskManagerDeploymentName(clusterId));
+    }
+
     @VisibleForTesting
     protected FlinkStandaloneKubeClient 
createNamespacedKubeClient(Configuration configuration) {
         final int poolSize =
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index 14fa2a9d..d2ad412a 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -628,4 +628,9 @@ public class TestingFlinkService extends 
AbstractFlinkService {
     public JobDetailsInfo getJobDetailsInfo(JobID jobID, Configuration conf) {
         return NativeFlinkServiceTest.createJobDetailsFor(List.of());
     }
+
+    @Override
+    protected List<String> getDeploymentNames(String namespace, String 
clusterId) {
+        return List.of(clusterId + "-deployment");
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
index 637e3bed..a2b09647 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
@@ -95,10 +95,17 @@ import org.apache.flink.util.function.TriFunction;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
 import io.fabric8.kubernetes.api.model.DeletionPropagation;
+import io.fabric8.kubernetes.api.model.ListMeta;
 import io.fabric8.kubernetes.api.model.ObjectMeta;
 import io.fabric8.kubernetes.api.model.PodList;
+import io.fabric8.kubernetes.api.model.WatchEvent;
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
+import io.fabric8.kubernetes.api.model.apps.DeploymentList;
+import io.fabric8.kubernetes.api.model.apps.DeploymentListBuilder;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -108,6 +115,7 @@ import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.HttpURLConnection;
 import java.net.ServerSocket;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -144,6 +152,7 @@ public class AbstractFlinkServiceTest {
     File testJar;
 
     private KubernetesClient client;
+    private KubernetesMockServer mockServer;
     private final Configuration configuration = new Configuration();
 
     private final FlinkConfigManager configManager = new 
FlinkConfigManager(configuration);
@@ -1046,6 +1055,59 @@ public class AbstractFlinkServiceTest {
                 });
     }
 
+    @Test
+    public void testWaitForClusterShutdown() {
+        String deploymentName = "test-cluster";
+        String namespace = "test-namespace";
+        String getUrl =
+                String.format(
+                        
"/apis/apps/v1/namespaces/%s/deployments?fieldSelector=metadata.name%%3D%s",
+                        namespace, deploymentName);
+        String watchUrl =
+                String.format(
+                        
"/apis/apps/v1/namespaces/%s/deployments?fieldSelector=metadata.name%%3D%s&timeoutSeconds=600&allowWatchBookmarks=true&watch=true",
+                        namespace, deploymentName);
+
+        var flinkService = new TestingService(null);
+
+        Deployment deployment =
+                new DeploymentBuilder()
+                        .withNewMetadata()
+                        .withName(deploymentName)
+                        .withNamespace(namespace)
+                        .endMetadata()
+                        .build();
+
+        DeploymentList deploymentList =
+                new DeploymentListBuilder()
+                        .withMetadata(new ListMeta())
+                        .withItems(deployment)
+                        .build();
+
+        mockServer
+                .expect()
+                .get()
+                .withPath(getUrl)
+                .andReturn(HttpURLConnection.HTTP_OK, deploymentList)
+                .once();
+        mockServer
+                .expect()
+                .get()
+                .withPath(watchUrl)
+                .andUpgradeToWebSocket()
+                .open()
+                .waitFor(10)
+                .andEmit(new WatchEvent(deployment, "DELETED"))
+                .done()
+                .always();
+
+        boolean result =
+                flinkService.waitForDeploymentToBeRemoved(namespace, 
deploymentName, 10000);
+
+        assertTrue(result);
+        assertEquals(2, mockServer.getRequestCount());
+    }
+
     class TestingService extends AbstractFlinkService {
 
         RestClusterClient<String> clusterClient;
@@ -1089,6 +1151,11 @@ public class AbstractFlinkServiceTest {
             return tmPods.getOrDefault(Tuple2.of(namespace, clusterId), new 
PodList());
         }
 
+        @Override
+        protected List<String> getDeploymentNames(String namespace, String 
clusterId) {
+            return List.of(clusterId);
+        }
+
         @Override
         protected void deployApplicationCluster(JobSpec jobSpec, Configuration 
conf) {
             throw new UnsupportedOperationException();
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
index f583df92..7c1e2175 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
@@ -33,8 +33,6 @@ import 
org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
 import org.apache.flink.util.concurrent.Executors;
 
 import io.fabric8.kubernetes.api.model.ObjectMeta;
-import io.fabric8.kubernetes.api.model.Pod;
-import io.fabric8.kubernetes.api.model.PodList;
 import io.fabric8.kubernetes.api.model.apps.Deployment;
 import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
 import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
@@ -121,7 +119,8 @@ public class StandaloneFlinkServiceTest {
                 new Configuration(),
                 true);
 
-        assertEquals(2, service.nbCall);
+        // How many times were getDeploymentNames() called
+        assertEquals(1, service.nbCall);
 
         deployments = kubernetesClient.apps().deployments().list().getItems();
 
@@ -290,14 +289,9 @@ public class StandaloneFlinkServiceTest {
         }
 
         @Override
-        protected PodList getTmPodList(String namespace, String clusterId) {
+        protected List<String> getDeploymentNames(String namespace, String 
clusterId) {
             nbCall++;
-            PodList podList = new PodList();
-            if (nbCall == 1) {
-                Pod pod = new Pod();
-                podList.setItems(List.of(pod));
-            }
-            return podList;
+            return List.of(clusterId);
         }
     }
 

Reply via email to