This is an automated email from the ASF dual-hosted git repository.

nicknezis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 9af45c7  Improving Kubernetes scheduler logic (#3653)
9af45c7 is described below

commit 9af45c730e5fea3ec18c1459970e6c69f7b6b7b0
Author: Nicholas Nezis <[email protected]>
AuthorDate: Sun Jan 3 20:59:50 2021 -0500

    Improving Kubernetes scheduler logic (#3653)
    
    * Added support for HTTP_NOT_FOUND response code
    * Updated to use try-with-resources logic for Response cleanup
    * More cleanup. Now throwing TopologyRuntimeManagementException in more 
places
---
 WORKSPACE                                          |  3 +-
 .../heron/scheduler/kubernetes/V1Controller.java   | 84 ++++++++++++++--------
 2 files changed, 55 insertions(+), 32 deletions(-)

diff --git a/WORKSPACE b/WORKSPACE
index 8e46253..f6c7a55 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -54,7 +54,7 @@ jetty_version = "9.4.6.v20170531"
 
 jersey_version = "2.25.1"
 
-kubernetes_client_version = "8.0.0"
+kubernetes_client_version = "11.0.0"
 
 load("@rules_jvm_external//:defs.bzl", "maven_install")
 load("@rules_jvm_external//:specs.bzl", "maven")
@@ -263,6 +263,7 @@ http_archive(
 http_archive(
     name = "org_apache_zookeeper",
     build_file = "@//:third_party/zookeeper/BUILD",
+    sha256 = 
"bafc0abe7da696a2020ba11b8ce7d06f6e28e9bf1e5504de09be25b8b589777d",
     strip_prefix = "apache-zookeeper-3.5.8",
     urls = 
["https://archive.apache.org/dist/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8.tar.gz";],
 )
diff --git 
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
 
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
index f4efe2c..0fd62b0 100644
--- 
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
+++ 
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
@@ -70,6 +70,8 @@ import io.kubernetes.client.util.PatchUtils;
 
 import okhttp3.Response;
 
+import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+
 public class V1Controller extends KubernetesController {
 
   private static final Logger LOG =
@@ -219,54 +221,74 @@ public class V1Controller extends KubernetesController {
         null, null, null);
   }
 
-  boolean deleteService() {
-    try {
-      final Response response = 
coreClient.deleteNamespacedServiceCall(getTopologyName(),
+  void deleteService() {
+    try (Response response = 
coreClient.deleteNamespacedServiceCall(getTopologyName(),
           getNamespace(), null, null, 0, null,
-          KubernetesConstants.DELETE_OPTIONS_PROPAGATION_POLICY, null, 
null).execute();
-
-      if (response.isSuccessful()) {
-        LOG.log(Level.INFO, "Headless Service for the Job [" + 
getTopologyName()
-            + "] in namespace [" + getNamespace() + "] is deleted.");
-        return true;
-      } else {
+          KubernetesConstants.DELETE_OPTIONS_PROPAGATION_POLICY, null, 
null).execute()) {
+
+      if (!response.isSuccessful()) {
+        if (response.code() == HTTP_NOT_FOUND) {
+          LOG.log(Level.WARNING, "Deleting non-existent Kubernetes headless 
service for Topology: "
+                  + getTopologyName());
+          return;
+        }
         LOG.log(Level.SEVERE, "Error when deleting the Service of the job ["
-            + getTopologyName() + "] in namespace [" + getNamespace() + "]");
-        LOG.log(Level.SEVERE, "Error killing topoogy message:" + 
response.message());
+                + getTopologyName() + "] in namespace [" + getNamespace() + 
"]");
+        LOG.log(Level.SEVERE, "Error killing topology message:" + 
response.message());
         KubernetesUtils.logResponseBodyIfPresent(LOG, response);
 
         throw new TopologyRuntimeManagementException(
-            KubernetesUtils.errorMessageFromResponse(response));
+                KubernetesUtils.errorMessageFromResponse(response));
       }
-    } catch (IOException | ApiException e) {
-      KubernetesUtils.logExceptionWithDetails(LOG, "Error deleting topology 
service", e);
-      return false;
+    } catch (ApiException e) {
+      if (e.getCode() == HTTP_NOT_FOUND) {
+        LOG.log(Level.WARNING, "Tried to delete a non-existent Kubernetes 
service for Topology: "
+                + getTopologyName());
+        return;
+      }
+      throw new TopologyRuntimeManagementException("Error deleting topology ["
+              + getTopologyName() + "] Kubernetes service", e);
+    } catch (IOException e) {
+      throw new TopologyRuntimeManagementException("Error deleting topology ["
+              + getTopologyName() + "] Kubernetes service", e);
     }
+    LOG.log(Level.INFO, "Headless Service for the Job [" + getTopologyName()
+            + "] in namespace [" + getNamespace() + "] is deleted.");
   }
 
-  boolean deleteStatefulSet() {
-    try {
-      final Response response = 
appsClient.deleteNamespacedStatefulSetCall(getTopologyName(),
+  void deleteStatefulSet() {
+    try (Response response = 
appsClient.deleteNamespacedStatefulSetCall(getTopologyName(),
           getNamespace(), null, null, 0, null,
-          KubernetesConstants.DELETE_OPTIONS_PROPAGATION_POLICY, null, 
null).execute();
-
-      if (response.isSuccessful()) {
-        LOG.log(Level.INFO, "StatefulSet for the Job [" + getTopologyName()
-            + "] in namespace [" + getNamespace() + "] is deleted.");
-        return true;
-      } else {
+          KubernetesConstants.DELETE_OPTIONS_PROPAGATION_POLICY, null, 
null).execute()) {
+
+      if (!response.isSuccessful()) {
+        if (response.code() == HTTP_NOT_FOUND) {
+          LOG.log(Level.WARNING, "Tried to delete a non-existent StatefulSet 
for Topology: "
+                  + getTopologyName());
+          return;
+        }
         LOG.log(Level.SEVERE, "Error when deleting the StatefulSet of the job 
["
-            + getTopologyName() + "] in namespace [" + getNamespace() + "]");
+                + getTopologyName() + "] in namespace [" + getNamespace() + 
"]");
         LOG.log(Level.SEVERE, "Error killing topology message: " + 
response.message());
         KubernetesUtils.logResponseBodyIfPresent(LOG, response);
 
         throw new TopologyRuntimeManagementException(
-            KubernetesUtils.errorMessageFromResponse(response));
+                KubernetesUtils.errorMessageFromResponse(response));
       }
-    } catch (IOException | ApiException e) {
-      KubernetesUtils.logExceptionWithDetails(LOG, "Error deleting topology", 
e);
-      return false;
+    } catch (ApiException e) {
+      if (e.getCode() == HTTP_NOT_FOUND) {
+        LOG.log(Level.WARNING, "Tried to delete a non-existent StatefulSet for 
Topology: "
+                + getTopologyName());
+        return;
+      }
+      throw new TopologyRuntimeManagementException("Error deleting topology ["
+              + getTopologyName() + "] Kubernetes StatefulSet", e);
+    } catch (IOException e) {
+      throw new TopologyRuntimeManagementException("Error deleting topology ["
+              + getTopologyName() + "] Kubernetes StatefulSet", e);
     }
+    LOG.log(Level.INFO, "StatefulSet for the Job [" + getTopologyName()
+            + "] in namespace [" + getNamespace() + "] is deleted.");
   }
 
   protected List<String> getExecutorCommand(String containerId) {

Reply via email to