This is an automated email from the ASF dual-hosted git repository.
nicknezis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 9af45c7 Improving Kubernetes scheduler logic (#3653)
9af45c7 is described below
commit 9af45c730e5fea3ec18c1459970e6c69f7b6b7b0
Author: Nicholas Nezis <[email protected]>
AuthorDate: Sun Jan 3 20:59:50 2021 -0500
Improving Kubernetes scheduler logic (#3653)
* Added support for HTTP_NOT_FOUND response code
* Updated to use try-with-resources logic for Response cleanup
* More cleanup. Now throwing TopologyRuntimeManagementException in more
places
---
WORKSPACE | 3 +-
.../heron/scheduler/kubernetes/V1Controller.java | 84 ++++++++++++++--------
2 files changed, 55 insertions(+), 32 deletions(-)
diff --git a/WORKSPACE b/WORKSPACE
index 8e46253..f6c7a55 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -54,7 +54,7 @@ jetty_version = "9.4.6.v20170531"
jersey_version = "2.25.1"
-kubernetes_client_version = "8.0.0"
+kubernetes_client_version = "11.0.0"
load("@rules_jvm_external//:defs.bzl", "maven_install")
load("@rules_jvm_external//:specs.bzl", "maven")
@@ -263,6 +263,7 @@ http_archive(
http_archive(
name = "org_apache_zookeeper",
build_file = "@//:third_party/zookeeper/BUILD",
+ sha256 =
"bafc0abe7da696a2020ba11b8ce7d06f6e28e9bf1e5504de09be25b8b589777d",
strip_prefix = "apache-zookeeper-3.5.8",
urls =
["https://archive.apache.org/dist/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8.tar.gz"],
)
diff --git
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
index f4efe2c..0fd62b0 100644
---
a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
+++
b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
@@ -70,6 +70,8 @@ import io.kubernetes.client.util.PatchUtils;
import okhttp3.Response;
+import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+
public class V1Controller extends KubernetesController {
private static final Logger LOG =
@@ -219,54 +221,74 @@ public class V1Controller extends KubernetesController {
null, null, null);
}
- boolean deleteService() {
- try {
- final Response response =
coreClient.deleteNamespacedServiceCall(getTopologyName(),
+ void deleteService() {
+ try (Response response =
coreClient.deleteNamespacedServiceCall(getTopologyName(),
getNamespace(), null, null, 0, null,
- KubernetesConstants.DELETE_OPTIONS_PROPAGATION_POLICY, null,
null).execute();
-
- if (response.isSuccessful()) {
- LOG.log(Level.INFO, "Headless Service for the Job [" +
getTopologyName()
- + "] in namespace [" + getNamespace() + "] is deleted.");
- return true;
- } else {
+ KubernetesConstants.DELETE_OPTIONS_PROPAGATION_POLICY, null,
null).execute()) {
+
+ if (!response.isSuccessful()) {
+ if (response.code() == HTTP_NOT_FOUND) {
+ LOG.log(Level.WARNING, "Deleting non-existent Kubernetes headless
service for Topology: "
+ + getTopologyName());
+ return;
+ }
LOG.log(Level.SEVERE, "Error when deleting the Service of the job ["
- + getTopologyName() + "] in namespace [" + getNamespace() + "]");
- LOG.log(Level.SEVERE, "Error killing topoogy message:" +
response.message());
+ + getTopologyName() + "] in namespace [" + getNamespace() +
"]");
+ LOG.log(Level.SEVERE, "Error killing topology message:" +
response.message());
KubernetesUtils.logResponseBodyIfPresent(LOG, response);
throw new TopologyRuntimeManagementException(
- KubernetesUtils.errorMessageFromResponse(response));
+ KubernetesUtils.errorMessageFromResponse(response));
}
- } catch (IOException | ApiException e) {
- KubernetesUtils.logExceptionWithDetails(LOG, "Error deleting topology
service", e);
- return false;
+ } catch (ApiException e) {
+ if (e.getCode() == HTTP_NOT_FOUND) {
+ LOG.log(Level.WARNING, "Tried to delete a non-existent Kubernetes
service for Topology: "
+ + getTopologyName());
+ return;
+ }
+ throw new TopologyRuntimeManagementException("Error deleting topology ["
+ + getTopologyName() + "] Kubernetes service", e);
+ } catch (IOException e) {
+ throw new TopologyRuntimeManagementException("Error deleting topology ["
+ + getTopologyName() + "] Kubernetes service", e);
}
+ LOG.log(Level.INFO, "Headless Service for the Job [" + getTopologyName()
+ + "] in namespace [" + getNamespace() + "] is deleted.");
}
- boolean deleteStatefulSet() {
- try {
- final Response response =
appsClient.deleteNamespacedStatefulSetCall(getTopologyName(),
+ void deleteStatefulSet() {
+ try (Response response =
appsClient.deleteNamespacedStatefulSetCall(getTopologyName(),
getNamespace(), null, null, 0, null,
- KubernetesConstants.DELETE_OPTIONS_PROPAGATION_POLICY, null,
null).execute();
-
- if (response.isSuccessful()) {
- LOG.log(Level.INFO, "StatefulSet for the Job [" + getTopologyName()
- + "] in namespace [" + getNamespace() + "] is deleted.");
- return true;
- } else {
+ KubernetesConstants.DELETE_OPTIONS_PROPAGATION_POLICY, null,
null).execute()) {
+
+ if (!response.isSuccessful()) {
+ if (response.code() == HTTP_NOT_FOUND) {
+ LOG.log(Level.WARNING, "Tried to delete a non-existent StatefulSet
for Topology: "
+ + getTopologyName());
+ return;
+ }
LOG.log(Level.SEVERE, "Error when deleting the StatefulSet of the job
["
- + getTopologyName() + "] in namespace [" + getNamespace() + "]");
+ + getTopologyName() + "] in namespace [" + getNamespace() +
"]");
LOG.log(Level.SEVERE, "Error killing topology message: " +
response.message());
KubernetesUtils.logResponseBodyIfPresent(LOG, response);
throw new TopologyRuntimeManagementException(
- KubernetesUtils.errorMessageFromResponse(response));
+ KubernetesUtils.errorMessageFromResponse(response));
}
- } catch (IOException | ApiException e) {
- KubernetesUtils.logExceptionWithDetails(LOG, "Error deleting topology",
e);
- return false;
+ } catch (ApiException e) {
+ if (e.getCode() == HTTP_NOT_FOUND) {
+ LOG.log(Level.WARNING, "Tried to delete a non-existent StatefulSet for
Topology: "
+ + getTopologyName());
+ return;
+ }
+ throw new TopologyRuntimeManagementException("Error deleting topology ["
+ + getTopologyName() + "] Kubernetes StatefulSet", e);
+ } catch (IOException e) {
+ throw new TopologyRuntimeManagementException("Error deleting topology ["
+ + getTopologyName() + "] Kubernetes StatefulSet", e);
}
+ LOG.log(Level.INFO, "StatefulSet for the Job [" + getTopologyName()
+ + "] in namespace [" + getNamespace() + "] is deleted.");
}
protected List<String> getExecutorCommand(String containerId) {