This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new a8fd1942 [FLINK-34438] Wait for Deployments to be deleted on cluster shutdown a8fd1942 is described below commit a8fd19429e93428e8a2498c32def24aa8ebbd4c4 Author: Máté Czagány <4469996+mateczag...@users.noreply.github.com> AuthorDate: Tue Feb 20 12:17:54 2024 +0100 [FLINK-34438] Wait for Deployments to be deleted on cluster shutdown --- .../kubernetes_operator_config_configuration.html | 2 +- .../shortcodes/generated/system_section.html | 2 +- .../config/KubernetesOperatorConfigOptions.java | 2 +- .../operator/service/AbstractFlinkService.java | 85 ++++++++++------------ .../operator/service/NativeFlinkService.java | 6 ++ .../operator/service/StandaloneFlinkService.java | 8 ++ .../kubernetes/operator/TestingFlinkService.java | 5 ++ .../operator/service/AbstractFlinkServiceTest.java | 67 +++++++++++++++++ .../service/StandaloneFlinkServiceTest.java | 14 +--- 9 files changed, 132 insertions(+), 59 deletions(-) diff --git a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html index 00beffd0..d0680627 100644 --- a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html +++ b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html @@ -310,7 +310,7 @@ </tr> <tr> <td><h5>kubernetes.operator.resource.cleanup.timeout</h5></td> - <td style="word-wrap: break-word;">1 min</td> + <td style="word-wrap: break-word;">5 min</td> <td>Duration</td> <td>The timeout for the resource clean up to wait for flink to shutdown cluster.</td> </tr> diff --git a/docs/layouts/shortcodes/generated/system_section.html b/docs/layouts/shortcodes/generated/system_section.html index 9a0350e6..aa053c2f 100644 --- a/docs/layouts/shortcodes/generated/system_section.html +++ b/docs/layouts/shortcodes/generated/system_section.html @@ -106,7 +106,7 @@ </tr> <tr> <td><h5>kubernetes.operator.resource.cleanup.timeout</h5></td> - <td style="word-wrap: break-word;">1 min</td> + <td style="word-wrap: break-word;">5 min</td> <td>Duration</td> <td>The timeout for the resource clean up to wait for flink to shutdown cluster.</td> </tr> diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java index 62e3ba3e..1334ba4f 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java @@ -127,7 +127,7 @@ public class KubernetesOperatorConfigOptions { public static final ConfigOption<Duration> OPERATOR_RESOURCE_CLEANUP_TIMEOUT = operatorConfig("resource.cleanup.timeout") .durationType() - .defaultValue(Duration.ofSeconds(60)) + .defaultValue(Duration.ofMinutes(5)) .withDeprecatedKeys( operatorConfigKey("reconciler.flink.cluster.shutdown.timeout")) .withDescription( diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java index 2fafb027..101480da 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java @@ -101,8 +101,8 @@ import org.apache.flink.util.Preconditions; import io.fabric8.kubernetes.api.model.DeletionPropagation; import io.fabric8.kubernetes.api.model.ObjectMeta; import io.fabric8.kubernetes.api.model.PodList; -import io.fabric8.kubernetes.api.model.Service; import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientTimeoutException; import org.apache.commons.lang3.ObjectUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -125,6 +125,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -898,58 +899,50 @@ public abstract class AbstractFlinkService implements FlinkService { } } - /** Wait until the FLink cluster has completely shut down. */ - @VisibleForTesting - void waitForClusterShutdown(String namespace, String clusterId, long shutdownTimeout) { - LOG.info("Waiting for cluster shutdown..."); - - boolean jobManagerRunning = true; - boolean taskManagerRunning = true; - boolean serviceRunning = true; + /** Returns a list of Kubernetes Deployment names for given cluster. */ + protected abstract List<String> getDeploymentNames(String namespace, String clusterId); - for (int i = 0; i < shutdownTimeout; i++) { - if (jobManagerRunning) { - PodList jmPodList = getJmPodList(namespace, clusterId); + /** Wait until the FLink cluster has completely shut down. */ + protected void waitForClusterShutdown( + String namespace, String clusterId, long shutdownTimeout) { + long timeoutAt = System.currentTimeMillis() + shutdownTimeout * 1000; + LOG.info("Waiting {} seconds for cluster shutdown...", shutdownTimeout); - if (jmPodList == null || jmPodList.getItems().isEmpty()) { - jobManagerRunning = false; - } - } - if (taskManagerRunning) { - PodList tmPodList = getTmPodList(namespace, clusterId); + for (var deploymentName : getDeploymentNames(namespace, clusterId)) { + long deploymentTimeout = timeoutAt - System.currentTimeMillis(); - if (tmPodList.getItems().isEmpty()) { - taskManagerRunning = false; - } + if (!waitForDeploymentToBeRemoved(namespace, deploymentName, deploymentTimeout)) { + LOG.error( + "Failed to shut down cluster {} (deployment {}) in {} seconds, proceeding...", + clusterId, + deploymentName, + shutdownTimeout); + return; } + } + } - if (serviceRunning) { - Service service = - kubernetesClient - .services() - .inNamespace(namespace) - .withName( - ExternalServiceDecorator.getExternalServiceName(clusterId)) - .get(); - if (service == null) { - serviceRunning = false; - } - } + /** Wait until Deployment is removed, return false if timed out, otherwise return true. */ + @VisibleForTesting + boolean waitForDeploymentToBeRemoved(String namespace, String deploymentName, long timeout) { + LOG.info( + "Waiting for Deployment {} to shut down with {} seconds timeout...", + deploymentName, + timeout / 1000); - if (!jobManagerRunning && !serviceRunning && !taskManagerRunning) { - break; - } - // log a message waiting to shutdown Flink cluster every 5 seconds. - if ((i + 1) % 5 == 0) { - LOG.info("Waiting for cluster shutdown... ({}s)", i + 1); - } - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + try { + kubernetesClient + .apps() + .deployments() + .inNamespace(namespace) + .withName(deploymentName) + .waitUntilCondition(Objects::isNull, timeout, TimeUnit.MILLISECONDS); + + LOG.info("Deployment {} successfully shut down", deploymentName); + } catch (KubernetesClientTimeoutException e) { + return false; } - LOG.info("Cluster shutdown completed."); + return true; } private static List<JobStatusMessage> toJobStatusMessage( diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java index 52bb398e..fe5cde06 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java @@ -62,6 +62,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -132,6 +133,11 @@ public class NativeFlinkService extends AbstractFlinkService { return new PodList(); } + @Override + protected List<String> getDeploymentNames(String namespace, String clusterId) { + return List.of(KubernetesUtils.getDeploymentName(clusterId)); + } + protected void submitClusterInternal(Configuration conf) throws Exception { LOG.info("Deploying session cluster"); final ClusterClientServiceLoader clusterClientServiceLoader = diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java index c0439d04..0c3e7477 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java @@ -47,6 +47,7 @@ import io.fabric8.kubernetes.client.KubernetesClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -104,6 +105,13 @@ public class StandaloneFlinkService extends AbstractFlinkService { .list(); } + @Override + protected List<String> getDeploymentNames(String namespace, String clusterId) { + return List.of( + StandaloneKubernetesUtils.getJobManagerDeploymentName(clusterId), + StandaloneKubernetesUtils.getTaskManagerDeploymentName(clusterId)); + } + @VisibleForTesting protected FlinkStandaloneKubeClient createNamespacedKubeClient(Configuration configuration) { final int poolSize = diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java index 14fa2a9d..d2ad412a 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java @@ -628,4 +628,9 @@ public class TestingFlinkService extends AbstractFlinkService { public JobDetailsInfo getJobDetailsInfo(JobID jobID, Configuration conf) { return NativeFlinkServiceTest.createJobDetailsFor(List.of()); } + + @Override + protected List<String> getDeploymentNames(String namespace, String clusterId) { + return List.of(clusterId + "-deployment"); + } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java index 637e3bed..a2b09647 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java @@ -95,10 +95,17 @@ import org.apache.flink.util.function.TriFunction; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import io.fabric8.kubernetes.api.model.DeletionPropagation; +import io.fabric8.kubernetes.api.model.ListMeta; import io.fabric8.kubernetes.api.model.ObjectMeta; import io.fabric8.kubernetes.api.model.PodList; +import io.fabric8.kubernetes.api.model.WatchEvent; +import io.fabric8.kubernetes.api.model.apps.Deployment; +import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder; +import io.fabric8.kubernetes.api.model.apps.DeploymentList; +import io.fabric8.kubernetes.api.model.apps.DeploymentListBuilder; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; +import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -108,6 +115,7 @@ import org.junit.jupiter.params.provider.ValueSource; import java.io.File; import java.io.IOException; +import java.net.HttpURLConnection; import java.net.ServerSocket; import java.nio.file.Files; import java.nio.file.Path; @@ -144,6 +152,7 @@ public class AbstractFlinkServiceTest { File testJar; private KubernetesClient client; + private KubernetesMockServer mockServer; private final Configuration configuration = new Configuration(); private final FlinkConfigManager configManager = new FlinkConfigManager(configuration); @@ -1046,6 +1055,59 @@ public class AbstractFlinkServiceTest { }); } + @Test + public void testWaitForClusterShutdown() { + String deploymentName = "test-cluster"; + String namespace = "test-namespace"; + String getUrl = + String.format( + "/apis/apps/v1/namespaces/%s/deployments?fieldSelector=metadata.name%%3D%s", + namespace, deploymentName); + String watchUrl = + String.format( + "/apis/apps/v1/namespaces/%s/deployments?fieldSelector=metadata.name%%3D%s&timeoutSeconds=600&allowWatchBookmarks=true&watch=true", + namespace, deploymentName); + + var flinkService = new TestingService(null); + + Deployment deployment = + new DeploymentBuilder() + .withNewMetadata() + .withName(deploymentName) + .withNamespace(namespace) + .endMetadata() + .build(); + + DeploymentList deploymentList = + new DeploymentListBuilder() + .withMetadata(new ListMeta()) + .withItems(deployment) + .build(); + + mockServer + .expect() + .get() + .withPath(getUrl) + .andReturn(HttpURLConnection.HTTP_OK, deploymentList) + .once(); + mockServer + .expect() + .get() + .withPath(watchUrl) + .andUpgradeToWebSocket() + .open() + .waitFor(10) + .andEmit(new WatchEvent(deployment, "DELETED")) + .done() + .always(); + + boolean result = + flinkService.waitForDeploymentToBeRemoved(namespace, deploymentName, 10000); + + assertTrue(result); + assertEquals(2, mockServer.getRequestCount()); + } + class TestingService extends AbstractFlinkService { RestClusterClient<String> clusterClient; @@ -1089,6 +1151,11 @@ public class AbstractFlinkServiceTest { return tmPods.getOrDefault(Tuple2.of(namespace, clusterId), new PodList()); } + @Override + protected List<String> getDeploymentNames(String namespace, String clusterId) { + return List.of(clusterId); + } + @Override protected void deployApplicationCluster(JobSpec jobSpec, Configuration conf) { throw new UnsupportedOperationException(); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java index f583df92..7c1e2175 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java @@ -33,8 +33,6 @@ import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils; import org.apache.flink.util.concurrent.Executors; import io.fabric8.kubernetes.api.model.ObjectMeta; -import io.fabric8.kubernetes.api.model.Pod; -import io.fabric8.kubernetes.api.model.PodList; import io.fabric8.kubernetes.api.model.apps.Deployment; import io.fabric8.kubernetes.api.model.apps.DeploymentSpec; import io.fabric8.kubernetes.client.NamespacedKubernetesClient; @@ -121,7 +119,8 @@ public class StandaloneFlinkServiceTest { new Configuration(), true); - assertEquals(2, service.nbCall); + // How many times were getDeploymentNames() called + assertEquals(1, service.nbCall); deployments = kubernetesClient.apps().deployments().list().getItems(); @@ -290,14 +289,9 @@ public class StandaloneFlinkServiceTest { } @Override - protected PodList getTmPodList(String namespace, String clusterId) { + protected List<String> getDeploymentNames(String namespace, String clusterId) { nbCall++; - PodList podList = new PodList(); - if (nbCall == 1) { - Pod pod = new Pod(); - podList.setItems(List.of(pod)); - } - return podList; + return List.of(clusterId); } }